blob: a89949dad71ff2d8e63eabdb2bf663ae3abd7658 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.CacheLoader;
import org.apache.geode.cache.CacheLoaderException;
import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.LossAction;
import org.apache.geode.cache.MembershipAttributes;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionAccessException;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionDistributionException;
import org.apache.geode.cache.RegionMembershipListener;
import org.apache.geode.cache.ResumptionAction;
import org.apache.geode.cache.RoleException;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.execute.ResultSender;
import org.apache.geode.cache.persistence.PersistentReplicatesOfflineException;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.LockServiceDestroyedException;
import org.apache.geode.distributed.Role;
import org.apache.geode.distributed.internal.DistributionAdvisee;
import org.apache.geode.distributed.internal.DistributionAdvisor;
import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
import org.apache.geode.distributed.internal.DistributionAdvisor.ProfileVisitor;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.locks.DLockRemoteToken;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
import org.apache.geode.internal.cache.InitialImageOperation.GIIStatus;
import org.apache.geode.internal.cache.RegionMap.ARMLockTestHook;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
import org.apache.geode.internal.cache.control.MemoryEvent;
import org.apache.geode.internal.cache.event.DistributedEventTracker;
import org.apache.geode.internal.cache.event.EventTracker;
import org.apache.geode.internal.cache.eviction.EvictableEntry;
import org.apache.geode.internal.cache.execute.DistributedRegionFunctionExecutor;
import org.apache.geode.internal.cache.execute.DistributedRegionFunctionResultSender;
import org.apache.geode.internal.cache.execute.DistributedRegionFunctionResultWaiter;
import org.apache.geode.internal.cache.execute.LocalResultCollector;
import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
import org.apache.geode.internal.cache.execute.metrics.FunctionStats;
import org.apache.geode.internal.cache.execute.metrics.FunctionStatsManager;
import org.apache.geode.internal.cache.persistence.CreatePersistentRegionProcessor;
import org.apache.geode.internal.cache.persistence.PersistenceAdvisor;
import org.apache.geode.internal.cache.persistence.PersistenceAdvisorImpl;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.persistence.PersistentMemberManager;
import org.apache.geode.internal.cache.persistence.PersistentMemberView;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.internal.cache.tx.RemoteClearMessage;
import org.apache.geode.internal.cache.tx.RemoteDestroyMessage;
import org.apache.geode.internal.cache.tx.RemoteFetchVersionMessage;
import org.apache.geode.internal.cache.tx.RemoteFetchVersionMessage.FetchVersionResponse;
import org.apache.geode.internal.cache.tx.RemoteInvalidateMessage;
import org.apache.geode.internal.cache.tx.RemotePutMessage;
import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
import org.apache.geode.internal.cache.versions.RegionVersionHolder;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Retained;
import org.apache.geode.internal.sequencelog.RegionLogger;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;
@SuppressWarnings("deprecation")
public class DistributedRegion extends LocalRegion implements InternalDistributedRegion {
private static final Logger logger = LogService.getLogger();
/** causes cache profile to be added to afterRemoteRegionCreate notification for testing */
@MutableForTesting
public static boolean TEST_HOOK_ADD_PROFILE = false;
/** Used to sync accesses to this.dlockService to allow lazy construction */
private final Object dlockMonitor = new Object();
final CacheDistributionAdvisor distAdvisor;
private final SenderIdMonitor senderIdMonitor;
/**
* GuardedBy {@link #dlockMonitor}
*/
private DistributedLockService dlockService;
final AdvisorListener advisorListener = new AdvisorListener();
/** Set of currently missing required roles */
private final HashSet<Role> missingRequiredRoles = new HashSet<>();
/** True if this region is currently missing any required roles */
private volatile boolean isMissingRequiredRoles = false;
/**
* True if this region is has any required roles defined and the LossAction is either NO_ACCESS or
* LIMITED_ACCESS. Reliability checks will only happen if this is true.
*/
private final boolean requiresReliabilityCheck;
/**
* Latch that is opened after initialization waits for required roles up to the
* <a href="DistributedSystem#member-timeout">member-timeout </a>.
*/
private final StoppableCountDownLatch initializationLatchAfterMemberTimeout;
private final PersistenceAdvisor persistenceAdvisor;
private final PersistentMemberID persistentId;
/**
* This boolean is set to false when this region is non-persistent, but there are persistent
* members in the distributed system to which all region modifications should be forwarded see bug
* 45186
*/
private volatile boolean generateVersionTag = true;
/** Tests can set this to true and ignore reliability triggered reconnects */
@MutableForTesting
public static boolean ignoreReconnect = false;
/**
* Lock to prevent multiple threads on this member from performing a clear at the same time.
*/
private final Object clearLock = new Object();
private final ReentrantReadWriteLock failedInitialImageLock = new ReentrantReadWriteLock(true);
@MakeNotStatic
private static final AtomicBoolean loggedNetworkPartitionWarning = new AtomicBoolean(false);
/** Creates a new instance of DistributedRegion */
protected DistributedRegion(String regionName, RegionAttributes attrs, LocalRegion parentRegion,
InternalCache cache, InternalRegionArguments internalRegionArgs,
StatisticsClock statisticsClock) {
super(regionName, attrs, parentRegion, cache, internalRegionArgs, statisticsClock);
initializationLatchAfterMemberTimeout =
new StoppableCountDownLatch(getCancelCriterion(), 1);
distAdvisor = createDistributionAdvisor(internalRegionArgs);
senderIdMonitor = createSenderIdMonitor();
if (getDistributionManager().getConfig().getEnableNetworkPartitionDetection()
&& !isInternalRegion() && !attrs.getScope().isAck() && !doesNotDistribute()
&& attrs.getDataPolicy().withStorage()) {
logger.warn(
"Region {} is being created with scope {} but enable-network-partition-detection is enabled in the distributed system. This can lead to cache inconsistencies if there is a network failure.",
new Object[] {regionName, attrs.getScope()});
}
if (!getDistributionManager().getConfig().getEnableNetworkPartitionDetection()
&& attrs.getDataPolicy().withPersistence()
&& !loggedNetworkPartitionWarning.getAndSet(true)) {
logger.warn(
"Creating persistent region {}, but enable-network-partition-detection is set to false. Running with network partition detection disabled can lead to an unrecoverable system in the event of a network split.",
regionName);
}
boolean setRequiresReliabilityCheck = attrs.getMembershipAttributes().hasRequiredRoles() &&
// note that the following includes NO_ACCESS, LIMITED_ACCESS,
!attrs.getMembershipAttributes().getLossAction().isAllAccess()
&& !attrs.getMembershipAttributes().getLossAction().isReconnect();
// this optimization is safe for as long as Roles and Required Roles are
// immutable
// if this VM fulfills all required roles, make requiresReliabilityCheck
// false
Set<Role> reqRoles = new HashSet<>(attrs.getMembershipAttributes().getRequiredRoles());
reqRoles.removeAll(getSystem().getDistributedMember().getRoles());
if (reqRoles.isEmpty()) {
setRequiresReliabilityCheck = false;
}
requiresReliabilityCheck = setRequiresReliabilityCheck;
if (internalRegionArgs.isUsedForPartitionedRegionBucket()) {
persistenceAdvisor = internalRegionArgs.getPersistenceAdvisor();
} else if (allowsPersistence()) {
// TODO: prpersist - using this lock service is a hack. Maybe? Or maybe
// it's ok if we have one (rarely used) lock service for many operations?
// What does the resource manager do?
DistributedLockService dl = cache.getPartitionedRegionLockService();
try {
// TODO: prpersist - this is just a quick and dirty storage mechanism so that
// I can test the storage.
DiskRegionStats diskStats;
PersistentMemberView storage;
if (getDataPolicy().withPersistence()) {
storage = getDiskRegion();
diskStats = getDiskRegion().getStats();
} else {
storage = new InMemoryPersistentMemberView();
diskStats = null;
}
PersistentMemberManager memberManager = cache.getPersistentMemberManager();
persistenceAdvisor = new PersistenceAdvisorImpl(distAdvisor, dl, storage,
getFullPath(), diskStats, memberManager);
} catch (Exception ignore) {
throw new InternalGemFireError("Couldn't recover persistence");
}
} else {
persistenceAdvisor = null;
}
if (persistenceAdvisor != null) {
persistentId = persistenceAdvisor.generatePersistentID();
} else {
persistentId = null;
}
}
@Override
protected EventTracker createEventTracker() {
EventTracker tracker = new DistributedEventTracker(cache, getCancelCriterion(), getName());
tracker.start();
return tracker;
}
/**
* Record the event state from image provider
*
* @param provider the member that provided the initial image and event state
*/
protected void recordEventStateFromImageProvider(InternalDistributedMember provider) {
// No Op. Only Bucket region will initiate event states
}
/**
* Intended for used during construction of a DistributedRegion
*
* @return the advisor to be used by the region
*/
protected CacheDistributionAdvisor createDistributionAdvisor(
InternalRegionArguments internalRegionArgs) {
// Warning: potential early escape of object before full construction
return CacheDistributionAdvisor.createCacheDistributionAdvisor(this);
}
/**
* Does this region support persistence?
*/
public boolean allowsPersistence() {
return true;
}
@Override
boolean requiresOneHopForMissingEntry(EntryEventImpl event) {
// received from another member - don't use one-hop
if (event.isOriginRemote()) {
return false;
}
// local ops aren't distributed
if (event.getOperation().isLocal()) {
return false;
}
// if it already has a valid version tag it can go out with a DistributedCacheOperation
if (event.getVersionTag() != null && event.getVersionTag().getRegionVersion() > 0) {
return false;
}
// if we're not allowed to generate a version tag we need to send it to someone who can
if (!generateVersionTag) {
return true;
}
return getConcurrencyChecksEnabled() && (getServerProxy() == null) && !isTX()
&& scope.isDistributed() && !getDataPolicy().withReplication();
}
@Override
public boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld,
Object expectedOldValue, boolean requireOldValue, long lastModified,
boolean overwriteDestroyed) throws TimeoutException, CacheWriterException {
final boolean isTraceEnabled = logger.isTraceEnabled();
Lock dlock = null;
if (scope.isGlobal() && // lock only applies to global scope
!event.isOriginRemote() && // only if operation originating locally
!event.isNetSearch() && // search and load processor handles own locking
!event.isNetLoad() &&
// @todo darrel/kirk: what about putAll?
!event.isLocalLoad() && !event.isSingleHopPutOp()) {
// Single Hop Op means dlock is already taken at origin node.
dlock = getDistributedLockIfGlobal(event.getKey());
}
if (isTraceEnabled) {
logger.trace("virtualPut invoked for event {}", event);
}
try {
if (!hasSeenEvent(event)) {
if (requiresOneHopForMissingEntry(event)) {
// bug #45704: see if a one-hop must be done for this operation
RegionEntry re = getRegionEntry(event.getKey());
if (re == null /* || re.isTombstone() */ || !generateVersionTag) {
if (!event.isBulkOpInProgress() || getDataPolicy().withStorage()) {
// putAll will send a single one-hop for empty regions. for other missing entries
// we need to get a valid version number before modifying the local cache
boolean didDistribute = RemotePutMessage.distribute(event, lastModified, false, false,
expectedOldValue, requireOldValue, !generateVersionTag);
if (!didDistribute && isTraceEnabled) {
logger.trace("Unable to perform one-hop messaging");
}
if (!generateVersionTag && !didDistribute) {
throw new PersistentReplicatesOfflineException();
}
if (didDistribute) {
if (isTraceEnabled) {
logger.trace("Event after remotePut operation: {}", event);
}
if (event.getVersionTag() == null) {
// if the event wasn't applied by the one-hop replicate it will not have a version
// tag
// and so should not be applied to this cache
return false;
}
}
}
}
}
return super.virtualPut(event, ifNew, ifOld, expectedOldValue, requireOldValue,
lastModified, overwriteDestroyed);
} else {
if (event.getDeltaBytes() != null && event.getRawNewValue() == null) {
// This means that this event has delta bytes but no full value.
// Request the full value of this event.
// The value in this vm may not be same as this event's value.
throw new InvalidDeltaException(
"Cache encountered replay of event containing delta bytes for key " + event.getKey());
}
// if the listeners have already seen this event, then it has already
// been successfully applied to the cache. Distributed messages and
// return
if (logger.isDebugEnabled()) {
logger.debug("DR.virtualPut: this cache has already seen this event {}", event);
}
// Fix 39014: when hasSeenEvent, put will still distribute
// event, but putAll did not. We add the logic back here, not to put
// back into DR.distributeUpdate() because we moved this part up into
// LR.basicPutPart3 in purpose.
if (event.isBulkOpInProgress() && !event.isOriginRemote()) {
event.getPutAllOperation().addEntry(event, true);
}
/*
* doing this so that other VMs will apply this no matter what. If it is an "update" they
* will not apply it if they don't have the key. Because this is probably a retry, it will
* never get applied to this local AbstractRegionMap, and so will never be flipped to a
* 'create'
*/
event.makeCreate();
if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
distributeUpdate(event, lastModified, ifNew, ifOld, expectedOldValue, requireOldValue);
event.invokeCallbacks(this, true, true);
}
return true;
}
} finally {
if (dlock != null) {
dlock.unlock();
}
}
}
@Override
RegionEntry basicPutEntry(EntryEventImpl event, long lastModified)
throws TimeoutException, CacheWriterException {
final boolean isTraceEnabled = logger.isTraceEnabled();
if (isTraceEnabled) {
logger.trace("basicPutEntry invoked for event {}", event);
}
if (requiresOneHopForMissingEntry(event)) {
// bug #45704: see if a one-hop must be done for this operation
RegionEntry re = getRegionEntry(event.getKey());
if (re == null /* || re.isTombstone() */ || !generateVersionTag) {
final boolean ifNew = false;
final boolean ifOld = false;
boolean didDistribute = RemotePutMessage.distribute(event, lastModified, ifNew, ifOld, null,
false, !generateVersionTag);
if (!generateVersionTag && !didDistribute) {
throw new PersistentReplicatesOfflineException();
}
if (didDistribute && isTraceEnabled) {
logger.trace("Event after remotePut for basicPutEntry: {}", event);
}
}
}
return super.basicPutEntry(event, lastModified);
}
@Override
void performPutAllEntry(EntryEventImpl event) {
/*
* force shared data view so that we just do the virtual op, accruing things in the put all
* operation for later
*/
if (isTX()) {
event.getPutAllOperation().addEntry(event);
} else {
getSharedDataView().putEntry(event, false, false, null, false, 0L, false);
}
}
@Override
void performRemoveAllEntry(EntryEventImpl event) {
// force shared data view so that we just do the virtual op, accruing things in the bulk
// operation for later
if (isTX()) {
event.getRemoveAllOperation().addEntry(event);
} else {
basicDestroy(event, true, null);
}
}
/**
* distribution and listener notification
*/
@Override
public void basicPutPart3(EntryEventImpl event, RegionEntry entry, boolean isInitialized,
long lastModified, boolean invokeCallbacks, boolean ifNew, boolean ifOld,
Object expectedOldValue, boolean requireOldValue) {
distributeUpdate(event, lastModified, false, false, null, false);
super.basicPutPart3(event, entry, isInitialized, lastModified, invokeCallbacks, ifNew, ifOld,
expectedOldValue, requireOldValue);
}
/** distribute an update operation */
protected void distributeUpdate(EntryEventImpl event, long lastModified, boolean ifNew,
boolean ifOld, Object expectedOldValue, boolean requireOldValue) {
// an update from a netSearch is not distributed
if (!event.isOriginRemote() && !event.isNetSearch() && !event.isBulkOpInProgress()) {
boolean distribute = true;
if (event.getInhibitDistribution()) {
// this has already been distributed by a one-hop operation
distribute = false;
}
if (distribute) {
// before distribute: DR's put, it has notified gateway sender earlier
UpdateOperation op = new UpdateOperation(event, lastModified);
if (logger.isTraceEnabled()) {
logger.trace("distributing operation for event : {} : for region : {}", event,
getName());
}
op.distribute();
}
}
}
@Override
public boolean hasSeenEvent(EntryEventImpl event) {
boolean isDuplicate;
isDuplicate = getEventTracker().hasSeenEvent(event);
if (isDuplicate) {
markEventAsDuplicate(event);
} else {
// bug #48205 - a retried PR operation may already have a version assigned to it
// in another VM
if (event.isPossibleDuplicate() && event.getRegion().getConcurrencyChecksEnabled()
&& event.getVersionTag() == null && event.getEventId() != null) {
boolean isBulkOp = event.getOperation().isPutAll() || event.getOperation().isRemoveAll();
VersionTag tag =
FindVersionTagOperation.findVersionTag(event.getRegion(), event.getEventId(), isBulkOp);
event.setVersionTag(tag);
}
}
return isDuplicate;
}
private void markEventAsDuplicate(EntryEventImpl event) {
event.setPossibleDuplicate(true);
if (getConcurrencyChecksEnabled() && event.getVersionTag() == null) {
if (event.isBulkOpInProgress()) {
event.setVersionTag(getEventTracker().findVersionTagForBulkOp(event.getEventId()));
} else {
event.setVersionTag(getEventTracker().findVersionTagForSequence(event.getEventId()));
}
}
}
void setGeneratedVersionTag(boolean generateVersionTag) {
// there is at-least one other persistent member, so turn on concurrencyChecks
enableConcurrencyChecks();
this.generateVersionTag = generateVersionTag;
}
protected boolean getGenerateVersionTag() {
return generateVersionTag;
}
@Override
boolean shouldGenerateVersionTag(RegionEntry entry, EntryEventImpl event) {
if (logger.isTraceEnabled()) {
logger.trace(
"shouldGenerateVersionTag this.generateVersionTag={} ccenabled={} dataPolicy={} event:{}",
generateVersionTag, getConcurrencyChecksEnabled(), getDataPolicy(), event);
}
if (!getConcurrencyChecksEnabled() || getDataPolicy() == DataPolicy.EMPTY
|| !generateVersionTag) {
return false;
}
if (getServerProxy() != null) { // client
return false;
}
if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) {
if (logger.isDebugEnabled()) {
logger.debug("Not to create a new version tag for retried event {}", event);
}
return false;
}
if (event.getOperation().isLocal()) { // bug #45402 - localDestroy generated a version tag
return false;
}
if (!event.isOriginRemote() && getDataPolicy().withReplication()) {
return true;
}
if (!getDataPolicy().withReplication() && !getDataPolicy().withPersistence()) {
// do not generate a version stamp in a region that has no replication if it's not based
// on an existing version from a replicate region
return entry.getVersionStamp().hasValidVersion();
}
if (!event.isOriginRemote() && event.getDistributedMember() != null) {
if (!event.getDistributedMember().equals(getMyId())) {
return event.getVersionTag() == null; // one-hop remote message
}
}
return false;
}
/**
* Throws RegionAccessException if required roles are missing and the LossAction is NO_ACCESS
*
* @throws RegionAccessException if required roles are missing and the LossAction is NO_ACCESS
*/
@Override
protected void checkForNoAccess() {
if (requiresReliabilityCheck && isMissingRequiredRoles) {
if (getMembershipAttributes().getLossAction().isNoAccess()) {
synchronized (missingRequiredRoles) {
if (!isMissingRequiredRoles)
return;
Set<Role> roles = Collections.unmodifiableSet(new HashSet<>(missingRequiredRoles));
throw new RegionAccessException(
String.format(
"Operation is disallowed by LossAction %s because these required roles are missing: %s.",
getMembershipAttributes().getLossAction(), roles),
getFullPath(), roles);
}
}
}
}
/**
* Throws RegionAccessException is required roles are missing and the LossAction is either
* NO_ACCESS or LIMITED_ACCESS.
*
* @throws RegionAccessException if required roles are missing and the LossAction is either
* NO_ACCESS or LIMITED_ACCESS
*/
@Override
public void checkForLimitedOrNoAccess() {
if (requiresReliabilityCheck && isMissingRequiredRoles) {
if (getMembershipAttributes().getLossAction().isNoAccess()
|| getMembershipAttributes().getLossAction().isLimitedAccess()) {
synchronized (missingRequiredRoles) {
if (!isMissingRequiredRoles)
return;
Set<Role> roles = Collections.unmodifiableSet(new HashSet<>(missingRequiredRoles));
Assert.assertTrue(!roles.isEmpty());
throw new RegionAccessException(
String.format(
"Operation is disallowed by LossAction %s because these required roles are missing: %s.",
getMembershipAttributes().getLossAction(), roles),
getFullPath(), roles);
}
}
}
}
@Override
public void handleReliableDistribution(Set successfulRecipients) {
handleReliableDistribution(successfulRecipients, Collections.emptySet(),
Collections.emptySet());
}
private void handleReliableDistribution(Set successfulRecipients, Set otherRecipients1,
Set otherRecipients2) {
if (requiresReliabilityCheck) {
MembershipAttributes ra = getMembershipAttributes();
// determine the successful roles
Set<Role> roles = new HashSet<>();
for (Object successfulRecipient : successfulRecipients) {
InternalDistributedMember mbr = (InternalDistributedMember) successfulRecipient;
if (mbr != null) {
roles.addAll(mbr.getRoles());
}
}
for (Object anOtherRecipients1 : otherRecipients1) {
InternalDistributedMember mbr = (InternalDistributedMember) anOtherRecipients1;
if (mbr != null) {
roles.addAll(mbr.getRoles());
}
}
for (Object anOtherRecipients2 : otherRecipients2) {
InternalDistributedMember mbr = (InternalDistributedMember) anOtherRecipients2;
if (mbr != null) {
roles.addAll(mbr.getRoles());
}
}
// determine the missing roles
Set<Role> failedRoles = new HashSet<>(ra.getRequiredRoles());
failedRoles.removeAll(roles);
if (failedRoles.isEmpty()) {
return;
}
throw new RegionDistributionException(
String.format("Operation distribution may have failed to notify these required roles: %s",
failedRoles),
getFullPath(), failedRoles);
}
}
/**
* Called when we do a distributed operation and don't have anyone to distributed it too. Since
* this is only called when no distribution was done (i.e. no recipients) we do not check
* isMissingRequiredRoles because it might not longer be true due to race conditions
*
* @return false if this region has at least one required role and queuing is configured. Returns
* true if sending to no one is ok.
* @throws RoleException if a required role is missing and the LossAction is either NO_ACCESS or
* LIMITED_ACCESS.
* @since GemFire 5.0
*/
boolean isNoDistributionOk() {
if (requiresReliabilityCheck) {
MembershipAttributes ra = getMembershipAttributes();
Set<Role> failedRoles = ra.getRequiredRoles();
throw new RegionDistributionException(
String.format("Operation distribution was not done to these required roles: %s",
failedRoles),
getFullPath(), failedRoles);
}
return true;
}
/**
* returns true if this Region does not distribute its operations to other members.
*
* @since GemFire 6.0
* @see HARegion#localDestroyNoCallbacks(Object)
*/
public boolean doesNotDistribute() {
return false;
}
@Override
public boolean shouldSyncForCrashedMember(InternalDistributedMember id) {
return !doesNotDistribute() && super.shouldSyncForCrashedMember(id);
}
@Override
public boolean requiresReliabilityCheck() {
return requiresReliabilityCheck;
}
/**
* Returns true if the ExpiryTask is currently allowed to expire.
* <p>
* If the region is in NO_ACCESS due to reliability configuration, then no expiration actions are
* allowed.
* <p>
* If the region is in LIMITED_ACCESS due to reliability configuration, then only non-distributed
* expiration actions are allowed.
*/
@Override
protected boolean isExpirationAllowed(ExpiryTask expiry) {
if (requiresReliabilityCheck && isMissingRequiredRoles) {
if (getMembershipAttributes().getLossAction().isNoAccess()) {
return false;
}
return !getMembershipAttributes().getLossAction().isLimitedAccess()
|| !expiry.isDistributedAction();
}
return true;
}
/**
* Performs the resumption action when reliability is resumed.
*
* @return true if asynchronous resumption is triggered
*/
private boolean resumeReliability(InternalDistributedMember id, Set newlyAcquiredRoles) {
boolean async = false;
try {
ResumptionAction ra = getMembershipAttributes().getResumptionAction();
if (ra.isNone()) {
if (logger.isDebugEnabled()) {
logger.debug("Reliability resumption for action of none");
}
resumeExpiration();
} else if (ra.isReinitialize()) {
async = true;
asyncResumeReliability(id, newlyAcquiredRoles);
}
} catch (Exception e) {
logger.fatal("Unexpected exception:",
e);
}
return async;
}
/**
* Handles asynchronous ResumptionActions such as region reinitialize.
*/
private void asyncResumeReliability(final InternalDistributedMember id,
final Set newlyAcquiredRoles) throws RejectedExecutionException {
final ResumptionAction ra = getMembershipAttributes().getResumptionAction();
getDistributionManager().getExecutors().getWaitingThreadPool().execute(() -> {
try {
if (ra.isReinitialize()) {
if (logger.isDebugEnabled()) {
logger.debug("Reliability resumption for action of reinitialize");
}
if (!isDestroyed() && !cache.isClosed()) {
RegionEventImpl event = new RegionEventImpl(this, Operation.REGION_REINITIALIZE, null,
false, getMyId(), generateEventID());
reinitialize(null, event);
}
synchronized (missingRequiredRoles) {
// any number of threads may be waiting on missingRequiredRoles
missingRequiredRoles.notifyAll();
if (hasListener() && id != null) {
// fire afterRoleGain event
RoleEventImpl relEvent = new RoleEventImpl(this, Operation.REGION_CREATE, null, true,
id, newlyAcquiredRoles);
dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_GAIN, relEvent);
}
}
}
} catch (Exception e) {
logger.fatal("Unexpected exception:", e);
}
});
}
/** Reschedules expiry tasks when reliability is resumed. */
private void resumeExpiration() {
boolean isNoAccess = getMembershipAttributes().getLossAction().isNoAccess();
boolean isLimitedAccess = getMembershipAttributes().getLossAction().isLimitedAccess();
if (!(isNoAccess || isLimitedAccess)) {
return; // early out: expiration was never affected by reliability
}
if (getEntryTimeToLive().getTimeout() > 0 && (isNoAccess || getEntryTimeToLive().getAction()
.isDistributed())) {
rescheduleEntryExpiryTasks();
} else if (getEntryIdleTimeout().getTimeout() > 0 && (isNoAccess || getEntryIdleTimeout()
.getAction().isDistributed())) {
rescheduleEntryExpiryTasks();
} else if (getCustomEntryTimeToLive() != null || getCustomEntryIdleTimeout() != null) {
// Force all entries to be rescheduled
rescheduleEntryExpiryTasks();
}
if (getRegionTimeToLive().getTimeout() > 0 && (isNoAccess || getRegionTimeToLive().getAction()
.isDistributed())) {
addTTLExpiryTask();
}
if (getRegionIdleTimeout().getTimeout() > 0 && (isNoAccess || getRegionIdleTimeout().getAction()
.isDistributed())) {
addIdleExpiryTask();
}
}
/**
* A boolean used to indicate if its the intialization time i.e the distributed Region is created
* for the first time. The variable is used at the time of lost reliablility.
*/
private boolean isInitializingThread = false;
/**
* Called when reliability is lost. If MembershipAttributes are configured with
* {@link LossAction#RECONNECT}then DistributedSystem reconnect will be called asynchronously.
*
* @return true if asynchronous resumption is triggered
*/
private boolean lostReliability(final InternalDistributedMember id, final Set newlyMissingRoles) {
if (DistributedRegion.ignoreReconnect) { // test hook
return false;
}
boolean async = false;
try {
if (getMembershipAttributes().getLossAction().isReconnect()) {
async = true;
doLostReliability(isInitializingThread, id, newlyMissingRoles);
}
} catch (CancelException cce) {
throw cce;
} catch (Exception e) {
logger.fatal("Unexpected exception:",
e);
}
return async;
}
private void doLostReliability(boolean isInitializing, final InternalDistributedMember id,
final Set newlyMissingRoles) {
try {
if (!isInitializing) {
// moved code to a new thread.
Thread thread = new LoggingThread("Reconnect Distributed System", () -> {
try {
logger.debug(
"Reliability loss with policy of reconnect and membership thread doing reconnect");
initializationLatchAfterMemberTimeout.await();
getSystem().tryReconnect(false, "Role Loss", getCache());
synchronized (missingRequiredRoles) {
// any number of threads may be waiting on missingRequiredRoles
missingRequiredRoles.notifyAll();
// need to fire an event if id is not null
if (hasListener() && id != null) {
RoleEventImpl relEvent = new RoleEventImpl(this,
Operation.CACHE_RECONNECT, null, true, id, newlyMissingRoles);
dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_LOSS, relEvent);
}
}
} catch (Exception e) {
logger.fatal("Unexpected exception:", e);
}
});
thread.start();
} else {
getSystem().tryReconnect(false, "Role Loss", getCache()); // added for
// reconnect.
synchronized (missingRequiredRoles) {
// any number of threads may be waiting on missingRequiredRoles
missingRequiredRoles.notifyAll();
// need to fire an event if id is not null
if (hasListener() && id != null) {
RoleEventImpl relEvent = new RoleEventImpl(this,
Operation.CACHE_RECONNECT, null, true, id, newlyMissingRoles);
dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_LOSS, relEvent);
}
}
}
} catch (CancelException ignor) {
throw ignor;
} catch (Exception e) {
logger.fatal("Unexpected exception:",
e);
}
}
private void lockCheckReadiness() {
cache.getCancelCriterion().checkCancelInProgress(null);
checkReadiness();
}
@Override
Object validatedDestroy(Object key, EntryEventImpl event)
throws TimeoutException, EntryNotFoundException, CacheWriterException {
Lock dlock = getDistributedLockIfGlobal(key);
try {
return super.validatedDestroy(key, event);
} finally {
if (dlock != null) {
dlock.unlock();
}
}
}
@Override
void localDestroyNoCallbacks(Object key) {
super.localDestroyNoCallbacks(key);
if (getScope().isGlobal()) {
try {
getLockService().freeResources(key);
} catch (LockServiceDestroyedException ignore) {
}
}
}
@Override
public void localDestroy(Object key, Object aCallbackArgument) throws EntryNotFoundException {
super.localDestroy(key, aCallbackArgument);
if (getScope().isGlobal()) {
try {
getLockService().freeResources(key);
} catch (LockServiceDestroyedException ignore) {
}
}
}
@Override
public void invalidate(Object key, Object aCallbackArgument)
throws TimeoutException, EntryNotFoundException {
validateKey(key);
checkReadiness();
checkForLimitedOrNoAccess();
Lock dlock = getDistributedLockIfGlobal(key);
try {
validatedInvalidate(key, aCallbackArgument);
} finally {
if (dlock != null)
dlock.unlock();
}
}
@Override
public Lock getRegionDistributedLock() throws IllegalStateException {
lockCheckReadiness();
checkForLimitedOrNoAccess();
if (!scope.isGlobal()) {
throw new IllegalStateException(
String.format(
"Distribution locks are only supported for regions with GLOBAL scope, not %s",
scope));
}
return new RegionDistributedLock();
}
@Override
public Lock getDistributedLock(Object key) throws IllegalStateException {
validateKey(key);
lockCheckReadiness();
checkForLimitedOrNoAccess();
if (!scope.isGlobal()) {
throw new IllegalStateException(
String.format(
"Distribution locks are only supported for regions with GLOBAL scope, not %s",
scope));
}
if (isLockingSuspendedByCurrentThread()) {
throw new IllegalStateException(
"This thread has suspended all locking for this region");
}
return new DistributedLock(key);
}
@Override
public void preInitialize() {
Set<String> allGatewaySenderIds = getAllGatewaySenderIds();
if (!allGatewaySenderIds.isEmpty()) {
for (GatewaySender sender : cache.getAllGatewaySenders()) {
if (sender.isParallel() && allGatewaySenderIds.contains(sender.getId())) {
// Once decided to support REPLICATED regions with parallel
// gateway-sender/asynchronous-event-queue, ShadowPartitionedRegionForUserRR should be
// called and this validation should be removed.
if (sender.getId().contains(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX)) {
throw new AsyncEventQueueConfigurationException(
String.format(
"Parallel Async Event Queue %s can not be used with replicated region %s",
AsyncEventQueueImpl.getAsyncEventQueueIdFromSenderId(sender.getId()),
getFullPath()));
} else {
throw new GatewaySenderConfigurationException(
String.format(
"Parallel gateway sender %s can not be used with replicated region %s",
sender.getId(), getFullPath()));
}
}
}
}
}
/**
* Called while NOT holding lock on parent's subregions
*
* @throws IllegalStateException if region is not compatible with a region in another VM.
*/
@Override
public void initialize(InputStream snapshotInputStream, InternalDistributedMember imageTarget,
InternalRegionArguments internalRegionArgs)
throws TimeoutException, IOException, ClassNotFoundException {
Assert.assertTrue(!isInitialized());
if (logger.isDebugEnabled()) {
logger.debug("DistributedRegion.initialize BEGIN: {}", getFullPath());
}
if (scope.isGlobal()) {
getLockService(); // create lock service eagerly now
}
try {
try {
PersistentMemberID persistentMemberId = null;
boolean recoverFromDisk = isRecoveryNeeded();
DiskRegion dskRgn = getDiskRegion();
if (recoverFromDisk) {
if (logger.isDebugEnabled()) {
logger.debug("DistributedRegion.getInitialImageAndRecovery: Starting Recovery");
}
dskRgn.initializeOwner(this); // do recovery
if (logger.isDebugEnabled()) {
logger.debug("DistributedRegion.getInitialImageAndRecovery: Finished Recovery");
}
persistentMemberId = dskRgn.getMyPersistentID();
}
// Create OQL indexes before starting GII.
createOQLIndexes(internalRegionArgs, recoverFromDisk);
if (getDataPolicy().withReplication() || getDataPolicy().withPreloaded()) {
getInitialImageAndRecovery(snapshotInputStream, imageTarget, internalRegionArgs,
recoverFromDisk, persistentMemberId);
} else {
new CreateRegionProcessor(this).initializeRegion();
if (snapshotInputStream != null) {
releaseBeforeGetInitialImageLatch();
loadSnapshotDuringInitialization(snapshotInputStream);
}
}
} catch (DiskAccessException dae) {
handleDiskAccessException(dae, true);
throw dae;
}
initMembershipRoles();
isInitializingThread = false;
// makes sure all latches are released if they haven't been already
super.initialize(null, null, null);
} finally {
getEventTracker().setInitialized();
}
}
@Override
void initialized() {
new UpdateAttributesProcessor(this).distribute(false);
}
/** True if GII was impacted by missing required roles */
private boolean giiMissingRequiredRoles = false;
/**
* A reference counter to protected the memoryThresholdReached boolean
*/
private final Set<DistributedMember> memoryThresholdReachedMembers = new HashSet<>();
// TODO: cleanup getInitialImageAndRecovery
private void getInitialImageAndRecovery(InputStream snapshotInputStream,
InternalDistributedMember imageSrc, InternalRegionArguments internalRegionArgs,
boolean recoverFromDisk, PersistentMemberID persistentId) throws TimeoutException {
logger.info("Initializing region {}",
getName());
ImageState imgState = getImageState();
imgState.init();
boolean targetRecreated = internalRegionArgs.getRecreateFlag();
if (recoverFromDisk && snapshotInputStream != null) {
throw new InternalGemFireError(
String.format(
"if loading a snapshot, then should not be recovering; isRecovering= %s ,snapshotStream= %s",
true, snapshotInputStream));
}
ProfileExchangeProcessor targetProvider;
if (getDataPolicy().withPersistence()) {
targetProvider =
new CreatePersistentRegionProcessor(this, getPersistenceAdvisor(), recoverFromDisk);
} else {
// this will go in the advisor profile
targetProvider = new CreateRegionProcessor(this);
}
imgState.setInRecovery(false);
RegionVersionVector recovered_rvv = null;
if (getDataPolicy().withPersistence()) {
recovered_rvv = getVersionVector() == null ? null
: getVersionVector().getCloneForTransmission();
}
// initializeRegion will send out our profile
targetProvider.initializeRegion();
if (persistenceAdvisor != null) {
persistenceAdvisor.initialize();
}
// Register listener here so that the remote members are known
// since registering calls initializeCriticalMembers (which needs to know about
// remote members
if (!isInternalRegion()) {
if (!isDestroyed) {
cache.getInternalResourceManager().addResourceListener(ResourceType.MEMORY, this);
}
}
releaseBeforeGetInitialImageLatch();
// allow GII to invoke test hooks. Do this just after releasing the
// before-gii latch for bug #48962. See ConcurrentLeaveDuringGIIDUnitTest
InitialImageOperation.beforeGetInitialImage(this);
if (snapshotInputStream != null) {
try {
if (logger.isDebugEnabled()) {
logger.debug(
"DistributedRegion.getInitialImageAndRecovery: About to load snapshot, isInitialized={}; {}",
isInitialized(), getFullPath());
}
loadSnapshotDuringInitialization(snapshotInputStream);
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus.NO_GII);
return;
}
// No snapshot provided, use the imageTarget(s)
// if we were given a recommended imageTarget, use that first, and
// treat it like it is a replicate (regardless of whether it actually is
// or not)
InitialImageOperation iiop = new InitialImageOperation(this, entries);
CacheDistributionAdvisor.InitialImageAdvice advice = null;
while (!isDestroyed()) {
advice = targetProvider.getInitialImageAdvice(advice);
boolean attemptGetFromOne = imageSrc != null // we were given a specific member
|| getDataPolicy().withPreloaded() && !advice.preloaded.isEmpty() // this is a
// preloaded
// region
|| (!advice.replicates.isEmpty());
// That is: if we have 0 or 1 giiProvider then we can do a getFromOne gii;
// if we have 2 or more giiProviders then we must do a getFromAll gii.
if (attemptGetFromOne) {
if (recoverFromDisk) {
if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
CacheObserverHolder.getInstance().afterMarkingGIIStarted();
}
}
{
// If we have an imageSrc and the target is reinitializing mark the
// getInitialImage so that it will wait until the target region is fully initialized
// before responding to the get image request. Otherwise, the
// source may respond with no data because it is still initializing,
// e.g. loading a snapshot.
// Plan A: use specified imageSrc, if specified
if (imageSrc != null) {
try {
GIIStatus ret = iiop.getFromOne(Collections.singleton(imageSrc), targetRecreated,
advice, recoverFromDisk, recovered_rvv);
if (GIIStatus.didGII(ret)) {
giiMissingRequiredRoles = false;
cleanUpDestroyedTokensAndMarkGIIComplete(ret);
return;
}
} finally {
imageSrc = null;
}
}
// Plan C: use a replicate, if one exists
GIIStatus ret =
iiop.getFromOne(advice.replicates, false, advice, recoverFromDisk, recovered_rvv);
if (GIIStatus.didGII(ret)) {
cleanUpDestroyedTokensAndMarkGIIComplete(ret);
return;
}
// Plan D: if this is a PRELOADED region, fetch from another PRELOADED
if (getDataPolicy().isPreloaded()) {
GIIStatus ret_preload =
iiop.getFromOne(advice.preloaded, false, advice, recoverFromDisk, recovered_rvv);
if (GIIStatus.didGII(ret_preload)) {
cleanUpDestroyedTokensAndMarkGIIComplete(ret_preload);
return;
}
} // isPreloaded
}
// If we got to this point, we failed in the GII. Cleanup
// any partial image we received
cleanUpAfterFailedGII(recoverFromDisk);
} // attemptGetFromOne
else {
if (!isDestroyed()) {
if (recoverFromDisk) {
logger.info(
"Region {} recovered from the local disk. Old persistent ID: {}, new persistent ID {}",
getFullPath(), persistentId, getPersistentID());
if (persistentId != null) {
RegionLogger.logRecovery(getFullPath(), persistentId,
getDistributionManager().getDistributionManagerId());
}
} else {
RegionLogger.logCreate(getFullPath(),
getDistributionManager().getDistributionManagerId());
if (getPersistentID() != null) {
RegionLogger.logPersistence(getFullPath(),
getDistributionManager().getDistributionManagerId(), getPersistentID());
logger.info("Region {} was created on this member with the persistent id {}.",
new Object[] {getFullPath(), getPersistentID()});
}
}
cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus.NO_GII);
return;
}
break;
}
}
}
public void scheduleSynchronizeForLostMember(InternalDistributedMember member,
VersionSource lostVersionID, long delay) {
getGemFireCache().getCCPTimer().schedule(new SystemTimer.SystemTimerTask() {
@Override
public void run2() {
performSynchronizeForLostMemberTask(member, lostVersionID);
}
}, delay);
}
void performSynchronizeForLostMemberTask(InternalDistributedMember member,
VersionSource lostVersionID) {
if (!isInitializedWithWait()) {
return;
}
synchronizeForLostMember(member, lostVersionID);
}
/**
* If this region has concurrency controls enabled this will pull any missing changes from other
* replicates using InitialImageOperation and a filtered chunking protocol.
*/
void synchronizeForLostMember(InternalDistributedMember lostMember,
VersionSource lostVersionID) {
if (!getConcurrencyChecksEnabled()) {
return;
}
CacheDistributionAdvisor advisor = getCacheDistributionAdvisor();
Set<InternalDistributedMember> targets = advisor.adviseInitializedReplicates();
for (InternalDistributedMember target : targets) {
synchronizeWith(target, lostVersionID, lostMember);
}
}
/**
* synchronize with another member wrt messages from the given "lost" member. This can be used
* when a primary bucket crashes to ensure that interrupted message distribution is mended.
*/
private void synchronizeWith(InternalDistributedMember target, VersionSource versionMember,
InternalDistributedMember lostMember) {
InitialImageOperation op = new InitialImageOperation(this, entries);
op.synchronizeWith(target, versionMember, lostMember);
}
/**
* This is invoked by syncForCrashedMember when scheduling region synchronization
* triggered by member departed event. It sets the regionSynchronizeScheduledOrDone
* flag in region version holder to true. This indicates that no additional region sync for
* the lost member is needed, when it receives requests for region sync for the lost member.
*/
public void setRegionSynchronizeScheduled(VersionSource lostMemberVersionID) {
RegionVersionHolder regionVersionHolder =
getVersionVector().getHolderForMember(lostMemberVersionID);
if (regionVersionHolder != null) {
regionVersionHolder.setRegionSynchronizeScheduled();
}
}
/**
* This method checks region version holder to see if regionSynchronizeScheduledOrDone is
* set to true for the lost member. If it is not, the regionSynchronizeScheduledOrDone variable
* is set to true and returns true. If it is already set to true, do nothing and returns false.
*/
public boolean setRegionSynchronizedWithIfNotScheduled(VersionSource lostMemberVersionID) {
RegionVersionHolder regionVersionHolder =
getVersionVector().getHolderForMember(lostMemberVersionID);
if (regionVersionHolder != null) {
return regionVersionHolder.setRegionSynchronizeScheduledOrDoneIfNot();
}
return false;
}
public boolean isInitializedWithWait() {
while (!isInitialized()) {
if (isDestroyed()) {
return false;
} else {
try {
if (logger.isDebugEnabled()) {
logger.debug(
"da.syncForCrashedMember waiting for region to finish initializing: {}", this);
}
Thread.sleep(100);
} catch (InterruptedException e) {
return false;
}
}
}
return true;
}
/** remove any partial entries received in a failed GII */
void cleanUpAfterFailedGII(boolean recoverFromDisk) {
DiskRegion dskRgn = getDiskRegion();
// if we have a persistent region, instead of deleting everything on disk,
// we will just reset the "recovered from disk" flag. After
// the next GII we will delete these entries if they do not come
// in as part of the GII.
if (recoverFromDisk && dskRgn != null && dskRgn.isBackup()) {
dskRgn.resetRecoveredEntries(this);
return;
}
if (!getRegionMap().isEmpty()) {
lockFailedInitialImageWriteLock();
try {
closeEntries();
if (getDiskRegion() != null) {
getDiskRegion().clear(this, null);
}
// clear the left-members and version-tags sets in imageState
getImageState().getLeftMembers();
getImageState().getVersionTags();
// Clear OQL indexes
if (indexManager != null) {
try {
indexManager.rerunIndexCreationQuery();
} catch (Exception ex) {
if (logger.isDebugEnabled()) {
logger.debug("Exception while clearing indexes after GII failure.", ex);
}
}
}
} finally {
unlockFailedInitialImageWriteLock();
}
}
}
void lockFailedInitialImageWriteLock() {
failedInitialImageLock.writeLock().lock();
}
void unlockFailedInitialImageWriteLock() {
failedInitialImageLock.writeLock().unlock();
}
void lockFailedInitialImageReadLock() {
failedInitialImageLock.readLock().lock();
}
private void unlockFailedInitialImageReadLock() {
failedInitialImageLock.readLock().unlock();
}
@Override
public boolean lockWhenRegionIsInitializing() {
if (!isInitialized()) {
lockFailedInitialImageReadLock();
return true;
}
return false;
}
@Override
public void unlockWhenRegionIsInitializing() {
unlockFailedInitialImageReadLock();
}
private void initMembershipRoles() {
synchronized (advisorListener) {
// hold sync to prevent listener from changing initial members
Set<InternalDistributedMember> others =
distAdvisor.addMembershipListenerAndAdviseGeneric(advisorListener);
advisorListener.addMembers(others);
// initialize missing required roles with initial member info
if (getMembershipAttributes().hasRequiredRoles()) {
// AdvisorListener will also sync on missingRequiredRoles
synchronized (missingRequiredRoles) {
missingRequiredRoles.addAll(getMembershipAttributes().getRequiredRoles());
// remove all the roles we are playing since they will never be
// missing
missingRequiredRoles.removeAll(getSystem().getDistributedMember().getRoles());
for (Object other1 : others) {
DistributedMember other = (DistributedMember) other1;
missingRequiredRoles.removeAll(other.getRoles());
}
}
}
}
if (getMembershipAttributes().hasRequiredRoles()) {
// wait up to memberTimeout for required roles...
// boolean requiredRolesAreMissing = false;
int memberTimeout = getSystem().getConfig().getMemberTimeout();
if (logger.isDebugEnabled()) {
logger.debug("Waiting up to {} for required roles.", memberTimeout);
}
try {
if (giiMissingRequiredRoles) {
// force reliability loss and possibly resumption
isInitializingThread = true;
synchronized (advisorListener) {
synchronized (missingRequiredRoles) {
// forcing state of loss because of bad GII
isMissingRequiredRoles = true;
getCachePerfStats().incReliableRegionsMissing(1);
if (getMembershipAttributes().getLossAction().isAllAccess())
getCachePerfStats().incReliableRegionsMissingFullAccess(1); // rahul
else if (getMembershipAttributes().getLossAction().isLimitedAccess())
getCachePerfStats().incReliableRegionsMissingLimitedAccess(1);
else if (getMembershipAttributes().getLossAction().isNoAccess())
getCachePerfStats().incReliableRegionsMissingNoAccess(1);
// pur code to increment the stats.
if (logger.isDebugEnabled()) {
logger.debug("GetInitialImage had missing required roles.");
}
// TODO: will this work with RECONNECT and REINITIALIZE?
isInitializingThread = true;
lostReliability(null, null);
if (missingRequiredRoles.isEmpty()) {
// all required roles are present so force resumption
isMissingRequiredRoles = false;
getCachePerfStats().incReliableRegionsMissing(-1);
if (getMembershipAttributes().getLossAction().isAllAccess())
getCachePerfStats().incReliableRegionsMissingFullAccess(-1); // rahul
else if (getMembershipAttributes().getLossAction().isLimitedAccess())
getCachePerfStats().incReliableRegionsMissingLimitedAccess(-1);
else if (getMembershipAttributes().getLossAction().isNoAccess())
getCachePerfStats().incReliableRegionsMissingNoAccess(-1);
// pur code to increment the stats.
boolean async = resumeReliability(null, null);
if (async) {
advisorListener.destroyed = true;
}
}
}
}
} else {
if (!getSystem().isLoner()) {
waitForRequiredRoles(memberTimeout);
}
boolean initiateLossAction = false;
synchronized (advisorListener) {
synchronized (missingRequiredRoles) {
if (missingRequiredRoles.isEmpty()) {
Assert.assertTrue(!isMissingRequiredRoles);
if (logger.isDebugEnabled()) {
logger.debug("Initialization completed with all required roles present.");
}
} else {
// starting in state of loss...
isMissingRequiredRoles = true;
getCachePerfStats().incReliableRegionsMissing(1);
if (getMembershipAttributes().getLossAction().isAllAccess())
getCachePerfStats().incReliableRegionsMissingFullAccess(1); // rahul
else if (getMembershipAttributes().getLossAction().isLimitedAccess())
getCachePerfStats().incReliableRegionsMissingLimitedAccess(1);
else if (getMembershipAttributes().getLossAction().isNoAccess())
getCachePerfStats().incReliableRegionsMissingNoAccess(1);
if (logger.isDebugEnabled()) {
logger.debug("Initialization completed with missing required roles: {}",
missingRequiredRoles);
}
isInitializingThread = true;
initiateLossAction = true;
}
}
}
if (initiateLossAction) {
lostReliability(null, null);
}
}
} catch (RegionDestroyedException ignore) {
// ignore to fix bug 34639 may be thrown by waitForRequiredRoles
} catch (CancelException e) {
// ignore to fix bug 34639 may be thrown by waitForRequiredRoles
if (isInitializingThread) {
throw e;
}
} catch (Exception e) {
logger.fatal("Unexpected exception:", e);
}
}
// open latch which will allow any threads in lostReliability to proceed
initializationLatchAfterMemberTimeout.countDown();
}
private boolean isRecoveryNeeded() {
return getDataPolicy().withPersistence() && getDiskRegion().isRecreated();
}
// called by InitialImageOperation to clean up destroyed tokens
// release afterGetInitialImageInitializationLatch before unlocking
// cleanUpLock
@SuppressWarnings("UL_UNRELEASED_LOCK")
protected void cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus giiStatus) {
// We need to clean up the disk before we release the after get initial image latch
DiskRegion dskRgn = getDiskRegion();
if (dskRgn != null && dskRgn.isBackup()) {
dskRgn.finishInitializeOwner(this, giiStatus);
}
ImageState is = getImageState();
is.lockGII();
// clear the version tag and left-members sets
is.getVersionTags();
is.getLeftMembers();
// remove DESTROYED tokens
RegionVersionVector rvv = is.getClearRegionVersionVector();
try {
Iterator<Object> keysIt = getImageState().getDestroyedEntries();
while (keysIt.hasNext()) {
entries.removeIfDestroyed(keysIt.next());
}
if (rvv != null) {
// clear any entries received in the GII that are older than the RVV versions.
// this can happen if entry chunks were received prior to the clear() being
// processed
clearEntries(rvv);
}
// need to do this before we release the afterGetInitialImageLatch
if (persistenceAdvisor != null) {
persistenceAdvisor.setOnline(GIIStatus.didGII(giiStatus), false, getPersistentID());
}
} finally {
// release after gii lock first so basicDestroy will see isInitialized()
// be true
// when they get the cleanUp lock.
try {
releaseAfterGetInitialImageLatch();
} finally { // make sure unlockGII is done for bug 40001
is.unlockGII();
}
}
if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
CacheObserverHolder.getInstance().afterMarkingGIICompleted();
}
// "Initializing region {0}" which is not acompanied by a completed message. Users think thread
// is stuck in some operation. Hence adding this log
logger.info("Initialization of region {} completed", getName());
}
@Override
public void basicBridgeRemove(Object key, Object expectedOldValue, Object p_callbackArg,
ClientProxyMembershipID memberId, boolean fromClient, EntryEventImpl clientEvent)
throws TimeoutException, EntryNotFoundException, CacheWriterException {
Lock lock = getDistributedLockIfGlobal(key);
try {
super.basicBridgeRemove(key, expectedOldValue, p_callbackArg, memberId, fromClient,
clientEvent);
} finally {
if (lock != null) {
logger.debug("releasing distributed lock on {}", key);
lock.unlock();
getLockService().freeResources(key);
}
}
}
@Override
public void basicBridgeDestroy(Object key, Object p_callbackArg, ClientProxyMembershipID memberId,
boolean fromClient, EntryEventImpl clientEvent)
throws TimeoutException, EntryNotFoundException, CacheWriterException {
Lock lock = getDistributedLockIfGlobal(key);
try {
super.basicBridgeDestroy(key, p_callbackArg, memberId, fromClient, clientEvent);
} finally {
if (lock != null) {
logger.debug("releasing distributed lock on {}", key);
lock.unlock();
getLockService().freeResources(key);
}
}
}
@Override
public void basicBridgeInvalidate(Object key, Object p_callbackArg,
ClientProxyMembershipID memberId, boolean fromClient, EntryEventImpl clientEvent)
throws TimeoutException, EntryNotFoundException, CacheWriterException {
Lock lock = getDistributedLockIfGlobal(key);
try {
super.basicBridgeInvalidate(key, p_callbackArg, memberId, fromClient, clientEvent);
} finally {
if (lock != null) {
logger.debug("releasing distributed lock on {}", key);
lock.unlock();
}
}
}
@Override
public void basicDestroy(EntryEventImpl event, boolean cacheWrite, Object expectedOldValue)
throws EntryNotFoundException, CacheWriterException, TimeoutException {
// disallow local destruction for mirrored keysvalues regions
boolean hasSeen = false;
if (hasSeenEvent(event)) {
hasSeen = true;
}
checkIfReplicatedAndLocalDestroy(event);
try {
boolean invokeWriter = cacheWrite;
if (requiresOneHopForMissingEntry(event)) {
// bug #45704: see if a one-hop must be done for this operation
RegionEntry re = getRegionEntry(event.getKey());
if (re == null /* || re.isTombstone() */ || !generateVersionTag) {
if (getServerProxy() == null) {
// only assert for non-client regions.
Assert.assertTrue(!getDataPolicy().withReplication() || !generateVersionTag);
}
if (!event.isBulkOpInProgress() || getDataPolicy().withStorage()) {
// removeAll will send a single one-hop for empty regions. for other missing entries
// we need to get a valid version number before modifying the local cache
// TODO: deltaGII: verify that delegating to a peer when this region is also a client is
// acceptable
boolean didDistribute =
RemoteDestroyMessage.distribute(event, expectedOldValue, !generateVersionTag);
if (!generateVersionTag && !didDistribute) {
throw new PersistentReplicatesOfflineException();
}
if (didDistribute) {
if (logger.isTraceEnabled()) {
logger.trace("Event after remoteDestroy operation: {}", event);
}
invokeWriter = false; // remote cache invoked the writer
if (event.getVersionTag() == null) {
// if the event wasn't applied by the one-hop replicate it will not have a version
// tag
// and so should not be applied to this cache
return;
}
}
}
}
}
super.basicDestroy(event, invokeWriter, expectedOldValue);
// if this is a destroy coming in from remote source, free up lock resources
// if this is a local origin destroy, this will happen after lock is
// released
if (scope.isGlobal() && event.isOriginRemote()) {
try {
getLockService().freeResources(event.getKey());
} catch (LockServiceDestroyedException ignore) {
}
}
} finally {
if (hasSeen) {
if (event.isBulkOpInProgress() && !event.isOriginRemote()) {
event.getRemoveAllOperation().addEntry(event, true);
}
if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
// before distribute: DR.destroy, hasSeenEvent. no to notifyGateway
distributeDestroy(event, expectedOldValue);
event.invokeCallbacks(this, true, false);
}
}
}
}
@Override
public void basicDestroyPart3(RegionEntry re, EntryEventImpl event, boolean inTokenMode,
boolean duringRI, boolean invokeCallbacks, Object expectedOldValue) {
distributeDestroy(event, expectedOldValue);
super.basicDestroyPart3(re, event, inTokenMode, duringRI, invokeCallbacks, expectedOldValue);
}
void distributeDestroy(EntryEventImpl event, Object expectedOldValue) {
if (event.isDistributed() && !event.isOriginRemote() && !event.isBulkOpInProgress()) {
boolean distribute = !event.getInhibitDistribution();
if (distribute) {
// before distribute: DR.destroy, it has notifiedGatewaySender ealier
DestroyOperation op = new DestroyOperation(event);
op.distribute();
}
}
}
@Override
boolean evictDestroy(EvictableEntry entry) {
boolean evictDestroyWasDone = super.evictDestroy(entry);
if (evictDestroyWasDone) {
if (scope.isGlobal()) {
try {
getLockService().freeResources(entry.getKey());
} catch (LockServiceDestroyedException ignore) {
}
}
}
return evictDestroyWasDone;
}
@Override
void basicInvalidateRegion(RegionEventImpl event) {
// disallow local invalidation for replicated regions
if (!event.getOperation().isDistributed() && getScope().isDistributed()
&& getDataPolicy().withReplication()) {
throw new IllegalStateException(
"Not allowed to do a local invalidation on a replicated region");
}
if (shouldDistributeInvalidateRegion(event)) {
distributeInvalidateRegion(event);
}
super.basicInvalidateRegion(event);
}
/**
* decide if InvalidateRegionOperation should be sent to peers. broken out so that BucketRegion
* can override
*
* @return true if {@link InvalidateRegionOperation} should be distributed, false otherwise
*/
protected boolean shouldDistributeInvalidateRegion(RegionEventImpl event) {
return event.getOperation().isDistributed() && !event.isOriginRemote();
}
/**
* Distribute the invalidate of a region given its event. This implementation sends the invalidate
* to peers.
*
* @since GemFire 5.7
*/
protected void distributeInvalidateRegion(RegionEventImpl event) {
new InvalidateRegionOperation(event).distribute();
}
@Override
void basicDestroyRegion(RegionEventImpl event, boolean cacheWrite, boolean lock,
boolean callbackEvents) throws CacheWriterException, TimeoutException {
final String path = getFullPath();
// Keep track of regions that are being destroyed. This helps avoid a race
// when another member concurrently creates this region. See bug 42051.
boolean isClose = event.getOperation().isClose();
if (!isClose) {
cache.beginDestroy(path, this);
}
try {
super.basicDestroyRegion(event, cacheWrite, lock, callbackEvents);
// send destroy region operation even if this is a localDestroyRegion (or
// close)
if (!event.isOriginRemote()) {
distributeDestroyRegion(event, true);
} else {
if (!event.isReinitializing()) {
RegionEventImpl localEvent =
new RegionEventImpl(this, Operation.REGION_LOCAL_DESTROY, event.getCallbackArgument(),
false, getMyId(), generateEventID()/* generate EventID */);
distributeDestroyRegion(localEvent, false/* fixes bug 41111 */);
}
}
notifyBridgeClients(event);
} catch (CancelException e) {
if (logger.isDebugEnabled()) {
logger.debug("basicDestroyRegion short-circuited due to cancellation");
}
} finally {
if (!isClose) {
cache.endDestroy(path, this);
}
RegionLogger.logDestroy(path, getMyId(), getPersistentID(), isClose);
}
}
@Override
void distributeDestroyRegion(RegionEventImpl event, boolean notifyOfRegionDeparture) {
if (persistenceAdvisor != null) {
persistenceAdvisor.releaseTieLock();
}
new DestroyRegionOperation(event, notifyOfRegionDeparture).distribute();
}
/**
* Return true if invalidation occurred; false if it did not, for example if it was already
* invalidated
*/
@Override
public void basicInvalidate(EntryEventImpl event) throws EntryNotFoundException {
boolean hasSeen = false;
if (hasSeenEvent(event)) {
hasSeen = true;
}
try {
// disallow local invalidation for replicated regions
if (event.isLocalInvalid() && !event.getOperation().isLocal() && getScope().isDistributed()
&& getDataPolicy().withReplication()) {
throw new IllegalStateException(
"Not allowed to do a local invalidation on a replicated region");
}
if (requiresOneHopForMissingEntry(event)) {
// bug #45704: see if a one-hop must be done for this operation
RegionEntry re = getRegionEntry(event.getKey());
if (re == null/* || re.isTombstone() */ || !generateVersionTag) {
if (getServerProxy() == null) {
// only assert for non-client regions.
Assert.assertTrue(!getDataPolicy().withReplication() || !generateVersionTag);
}
// TODO: deltaGII: verify that delegating to a peer when this region is also a client is
// acceptable
boolean didDistribute =
RemoteInvalidateMessage.distribute(event, !generateVersionTag);
if (!generateVersionTag && !didDistribute) {
throw new PersistentReplicatesOfflineException();
}
if (didDistribute) {
if (logger.isDebugEnabled()) {
logger.debug("Event after remoteInvalidate operation: {}", event);
}
if (event.getVersionTag() == null) {
// if the event wasn't applied by the one-hop replicate it will not have a version tag
// and so should not be applied to this cache
return;
}
}
}
}
super.basicInvalidate(event);
} finally {
if (hasSeen) {
if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
distributeInvalidate(event);
event.invokeCallbacks(this, true, false);
}
}
}
}
@Override
void basicInvalidatePart3(RegionEntry re, EntryEventImpl event, boolean invokeCallbacks) {
distributeInvalidate(event);
super.basicInvalidatePart3(re, event, invokeCallbacks);
}
void distributeInvalidate(EntryEventImpl event) {
if (!isRegionInvalid() && event.isDistributed() && !event.isOriginRemote()
&& !isTX() /* only distribute if non-tx */) {
if (event.isDistributed() && !event.isOriginRemote()) {
boolean distribute = !event.getInhibitDistribution();
if (distribute) {
// before distribute: DR.invalidate, it has triggered callback earlier
InvalidateOperation op = new InvalidateOperation(event);
op.distribute();
}
}
}
}
@Override
void basicUpdateEntryVersion(EntryEventImpl event) throws EntryNotFoundException {
InternalRegion internalRegion = event.getRegion();
AbstractRegionMap regionMap = (AbstractRegionMap) internalRegion.getRegionMap();
regionMap.lockForCacheModification(internalRegion, event);
final boolean locked = internalRegion.lockWhenRegionIsInitializing();
try {
try {
if (!hasSeenEvent(event)) {
super.basicUpdateEntryVersion(event);
}
} finally {
if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
distributeUpdateEntryVersion(event);
}
}
} finally {
if (locked) {
internalRegion.unlockWhenRegionIsInitializing();
}
regionMap.releaseCacheModificationLock(internalRegion, event);
}
}
void distributeUpdateEntryVersion(EntryEventImpl event) {
if (!isRegionInvalid() && event.isDistributed() && !event.isOriginRemote()
&& !isTX() /* only distribute if non-tx */) {
if (event.isDistributed() && !event.isOriginRemote()) {
// before distribute: DR has sent callback earlier
UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event);
op.distribute();
}
}
}
@Override
protected void basicClear(RegionEventImpl regionEvent) {
Lock dlock = getRegionDistributedLockIfGlobal();
try {
super.basicClear(regionEvent);
} finally {
if (dlock != null)
dlock.unlock();
}
}
@Override
void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
if (getConcurrencyChecksEnabled() && !getDataPolicy().withReplication()) {
boolean retry = false;
do {
// non-replicate regions must defer to a replicate for clear/invalidate of region
Set<InternalDistributedMember> repls = distAdvisor.adviseReplicates();
if (!repls.isEmpty()) {
InternalDistributedMember mbr = repls.iterator().next();
RemoteClearMessage op = RemoteClearMessage.create(mbr, this);
try {
op.distribute();
return;
} catch (CancelException | RegionDestroyedException | RemoteOperationException e) {
getCancelCriterion().checkCancelInProgress(e);
retry = true;
}
}
} while (retry);
}
// if no version vector or if no replicates are around, use the default mechanism
super.basicClear(regionEvent, cacheWrite);
}
@Override
void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
boolean enableRVV = useRVV && getDataPolicy().withReplication()
&& getConcurrencyChecksEnabled() && !getDistributionManager().isLoner();
// Fix for 46338 - apparently multiple threads from the same VM are allowed
// to suspend locking, which is what distributedLockForClear() does. We don't
// want that to happen, so we'll synchronize to make sure only one thread on
// this member performs a clear.
synchronized (clearLock) {
if (enableRVV) {
distributedLockForClear();
try {
Set<InternalDistributedMember> participants =
getCacheDistributionAdvisor().adviseInvalidateRegion();
// pause all generation of versions and flush from the other members to this one
try {
obtainWriteLocksForClear(regionEvent, participants);
clearRegionLocally(regionEvent, cacheWrite, null);
if (!regionEvent.isOriginRemote() && regionEvent.getOperation().isDistributed()) {
DistributedClearOperation.clear(regionEvent, null, participants);
}
} finally {
releaseWriteLocksForClear(regionEvent, participants);
}
} finally {
distributedUnlockForClear();
}
} else {
Set<InternalDistributedMember> participants =
getCacheDistributionAdvisor().adviseInvalidateRegion();
clearRegionLocally(regionEvent, cacheWrite, null);
if (!regionEvent.isOriginRemote() && regionEvent.getOperation().isDistributed()) {
DistributedClearOperation.clear(regionEvent, null, participants);
}
}
}
// since clients do not maintain RVVs except for tombstone GC
// we need to ensure that current ops reach the client queues
// before queuing a clear, but there is no infrastructure for doing so
notifyBridgeClients(regionEvent);
}
/**
* Obtain a distributed lock for the clear operation.
*/
private void distributedLockForClear() {
if (!scope.isGlobal()) { // non-global regions must lock when using RVV
try {
getLockService().lock("_clearOperation", -1, -1);
} catch (IllegalStateException e) {
lockCheckReadiness();
throw e;
}
}
}
/**
* Release the distributed lock for the clear operation.
*/
private void distributedUnlockForClear() {
if (!scope.isGlobal()) {
try {
getLockService().unlock("_clearOperation");
} catch (IllegalStateException e) {
lockCheckReadiness();
throw e;
}
}
}
/**
* obtain locks preventing generation of new versions in other members
*/
private void obtainWriteLocksForClear(RegionEventImpl regionEvent,
Set<InternalDistributedMember> participants) {
lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent);
DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants);
}
/**
* pause local operations so that a clear() can be performed and flush comm channels to the given
* member
*/
void lockLocallyForClear(DistributionManager dm, InternalDistributedMember locker,
CacheEvent event) {
RegionVersionVector rvv = getVersionVector();
ARMLockTestHook armLockTestHook = getRegionMap().getARMLockTestHook();
if (armLockTestHook != null) {
armLockTestHook.beforeLock(this, event);
}
if (rvv != null) {
// block new operations from being applied to the region map
rvv.lockForClear(getFullPath(), dm, locker);
// Check for region destroyed after we have locked, to make sure
// we don't continue a clear if the region has been destroyed.
checkReadiness();
// Only need to flush if NOACK at this point
if (getAttributes().getScope().isDistributedNoAck()) {
Set<InternalDistributedMember> members = getDistributionAdvisor().adviseCacheOp();
StateFlushOperation.flushTo(members, this);
}
}
if (armLockTestHook != null) {
armLockTestHook.afterLock(this, null);
}
}
/**
* releases the locks obtained in obtainWriteLocksForClear
*/
private void releaseWriteLocksForClear(RegionEventImpl regionEvent,
Set<InternalDistributedMember> participants) {
ARMLockTestHook armLockTestHook = getRegionMap().getARMLockTestHook();
if (armLockTestHook != null) {
armLockTestHook.beforeRelease(this, regionEvent);
}
getVersionVector().unlockForClear(getMyId());
DistributedClearOperation.releaseLocks(regionEvent, participants);
if (armLockTestHook != null) {
armLockTestHook.afterRelease(this, regionEvent);
}
}
/**
* Wait for in progress clears that were initiated by this member.
*/
private void waitForInProgressClear() {
RegionVersionVector rvv = getVersionVector();
if (rvv != null) {
synchronized (clearLock) {
// do nothing;
// I'm a little scared that the compiler might optimize
// away this synchronization if we really do nothing. Hence
// my fine log message below. This might not be necessary.
if (logger.isDebugEnabled()) {
logger.debug("Done waiting for clear");
}
}
}
}
/**
* Distribute Tombstone garbage-collection information to all peers with storage
*/
protected EventID distributeTombstoneGC(Set<Object> keysRemoved) {
getCachePerfStats().incTombstoneGCCount();
EventID eventId = new EventID(getSystem());
DistributedTombstoneOperation gc = DistributedTombstoneOperation.gc(this, eventId);
gc.distribute();
notifyClientsOfTombstoneGC(getVersionVector().getTombstoneGCVector(), keysRemoved, eventId,
null);
return eventId;
}
@Override
void basicLocalClear(RegionEventImpl rEvent) {
if (getScope().isDistributed() && getDataPolicy().withReplication()) {
throw new UnsupportedOperationException(
"localClear is not supported on distributed replicated regions.");
}
super.basicLocalClear(rEvent);
}
public DistributionConfig getDistributionConfig() {
return getSystem().getDistributionManager().getConfig();
}
@Override
public CacheDistributionAdvisor getDistributionAdvisor() {
return distAdvisor;
}
@Override
public CacheDistributionAdvisor getCacheDistributionAdvisor() {
return distAdvisor;
}
public PersistenceAdvisor getPersistenceAdvisor() {
return persistenceAdvisor;
}
public PersistentMemberID getPersistentID() {
return persistentId;
}
/** Returns the distribution profile; lazily creates one if needed */
@Override
public Profile getProfile() {
return distAdvisor.createProfile();
}
@Override
public void fillInProfile(Profile profile) {
assert profile instanceof CacheProfile;
CacheProfile cacheProfile = (CacheProfile) profile;
cacheProfile.dataPolicy = getDataPolicy();
cacheProfile.hasCacheLoader = basicGetLoader() != null;
cacheProfile.hasCacheWriter = basicGetWriter() != null;
cacheProfile.hasCacheListener = hasListener();
Assert.assertTrue(scope.isDistributed());
cacheProfile.scope = scope;
cacheProfile.inRecovery = getImageState().getInRecovery();
cacheProfile.isPersistent = getDataPolicy().withPersistence();
cacheProfile.setSubscriptionAttributes(getSubscriptionAttributes());
// Below PDX check is added for rolling upgrade support. We are
// removing Old wan in this checkin. PDX region are always gatewayEnabled
// irrespective whether gatewayHub is configured or not.
// Old version Pdx region always has this attribute true so to avoid region
// attribute comparison across member we are setting it to true.
cacheProfile.isGatewayEnabled = isPdxTypesRegion();
cacheProfile.serialNumber = getSerialNumber();
cacheProfile.regionInitialized = isInitialized();
cacheProfile.persistentID = getPersistentID();
if (getPersistenceAdvisor() != null) {
cacheProfile.persistenceInitialized = getPersistenceAdvisor().isOnline();
}
cacheProfile.hasCacheServer = !cache.getCacheServers().isEmpty();
cacheProfile.requiresOldValueInEvents = getDataPolicy().withReplication()
&& filterProfile != null && filterProfile.hasCQs();
cacheProfile.gatewaySenderIds = getGatewaySenderIds();
cacheProfile.asyncEventQueueIds = getVisibleAsyncEventQueueIds();
cacheProfile.isOffHeap = getOffHeap();
}
/**
* Return the DistributedLockService associated with this Region. This method will lazily create
* that service the first time it is invoked on this region.
*/
public DistributedLockService getLockService() {
synchronized (dlockMonitor) {
String dlsName = getFullPath();
if (dlockService == null) {
dlockService = DistributedLockService.getServiceNamed(dlsName);
if (dlockService == null) {
// region destroy will destroy dls and manual freeResources only
dlockService = DLockService.create(getFullPath(), getSystem(), true, false, false);
}
// handle is-lock-grantor region attribute...
if (isLockGrantor) {
dlockService.becomeLockGrantor();
}
if (logger.isDebugEnabled()) {
logger.debug("LockService for {} is using LockLease={}, LockTimeout={}", dlsName,
getCache().getLockLease(), getCache().getLockTimeout());
}
}
return dlockService;
}
}
@Override
boolean isCurrentlyLockGrantor() {
return scope.isGlobal() && getLockService().isLockGrantor();
}
@Override
public boolean isLockGrantor() {
return scope.isGlobal() && isLockGrantor;
}
@Override
public void becomeLockGrantor() {
checkReadiness();
checkForLimitedOrNoAccess();
if (!scope.isGlobal()) {
throw new IllegalStateException(
String.format(
"Distribution locks are only supported for regions with GLOBAL scope, not %s",
scope));
}
DistributedLockService svc = getLockService();
try {
super.becomeLockGrantor();
if (!svc.isLockGrantor()) {
svc.becomeLockGrantor();
}
} finally {
if (!svc.isLockGrantor()) {
if (logger.isDebugEnabled()) {
logger.debug("isLockGrantor is false after becomeLockGrantor for {}", getFullPath());
}
}
}
}
/** @return the deserialized value */
@Override
@Retained
Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate, TXStateInterface txState,
boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD,
ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
boolean returnTombstones) throws CacheLoaderException, TimeoutException {
@Released
EntryEventImpl event = null;
checkForLimitedOrNoAccess();
final Operation op = isCreate ? Operation.CREATE : Operation.UPDATE;
long lastModified = 0L;
try {
event = findOnServer(keyInfo, op, generateCallbacks, clientEvent);
if (event == null) {
event = createEventForLoad(keyInfo, generateCallbacks, requestingClient, op);
lastModified =
findUsingSearchLoad(txState, localValue, clientEvent, keyInfo, event, preferCD);
}
// Update region with new value.
if (event.hasNewValue() && !isMemoryThresholdReachedForLoad()) {
putNewValueInRegion(isCreate, clientEvent, lastModified, event);
} else if (isCreate) {
recordMiss(null, event.getKey());
}
return determineResult(preferCD, event);
} finally {
if (event != null) {
event.release();
}
}
}
private EntryEventImpl createEventForLoad(KeyInfo keyInfo, boolean generateCallbacks,
ClientProxyMembershipID requestingClient, Operation op) {
// Do not generate Event ID
EntryEventImpl event = EntryEventImpl.create(this, op, keyInfo.getKey(), null /* newValue */,
keyInfo.getCallbackArg(), false, getMyId(), generateCallbacks);
if (requestingClient != null) {
event.setContext(requestingClient);
}
return event;
}
private Object determineResult(boolean preferCD, EntryEventImpl event) {
if (preferCD) {
return event.getRawNewValueAsHeapObject();
}
return event.getNewValue();
}
private void putNewValueInRegion(boolean isCreate, EntryEventImpl clientEvent, long lastModified,
EntryEventImpl event) {
RegionEntry re = null;
// Set eventId. Required for interested clients.
event.setNewEventId(cache.getDistributedSystem());
long startPut = getStatisticsClock().getTime();
validateKey(event.getKey());
// this next step also distributes the object to other processes, if necessary
try {
re = basicPutEntry(event, lastModified);
// Update client event with latest version tag from re.
if (re != null && clientEvent != null) {
clientEvent.setVersionTag(event.getVersionTag());
}
if (!isTX()) {
getCachePerfStats().endPut(startPut, event.isOriginRemote());
}
} catch (ConcurrentCacheModificationException e) {
// the cache was modified while we were searching for this entry and
// the netsearch result was elided. Return the current value from the cache
updateEventWithCurrentRegionEntry(event, clientEvent);
} catch (CacheWriterException cwe) {
if (logger.isDebugEnabled()) {
logger.debug("findObjectInSystem: writer exception putting entry {} : {}", event, cwe);
}
}
if (isCreate) {
recordMiss(re, event.getKey());
}
}
private void updateEventWithCurrentRegionEntry(EntryEventImpl event, EntryEventImpl clientEvent) {
// defer the lruUpdateCallback to prevent a deadlock (see bug 51121).
final boolean disabled = entries.disableLruUpdateCallback();
try {
RegionEntry re = getRegionEntry(event.getKey());
if (re != null) {
synchronized (re) {
// value & version must be obtained atomically
// Update client event with latest version tag from re
if (clientEvent != null) {
clientEvent.setVersionTag(re.getVersionStamp().asVersionTag());
}
// OFFHEAP: need to incrc, copy to heap to setNewValue, decrc
event.setNewValue(re.getValue(this));
}
}
} finally {
if (disabled) {
entries.enableLruUpdateCallback();
}
try {
entries.lruUpdateCallback();
} catch (DiskAccessException dae) {
handleDiskAccessException(dae);
throw dae;
}
}
}
/**
* If its client, get the value from server.
*/
private EntryEventImpl findOnServer(KeyInfo keyInfo, Operation op, boolean generateCallbacks,
EntryEventImpl clientEvent) {
if (getServerProxy() == null) {
return null;
}
EntryEventImpl event = null;
VersionTagHolder holder = new VersionTagHolder();
Object aCallbackArgument = keyInfo.getCallbackArg();
Object value = getServerProxy().get(keyInfo.getKey(), aCallbackArgument, holder);
if (value != null) {
event = EntryEventImpl.create(this, op, keyInfo.getKey(), value, aCallbackArgument, false,
getMyId(), generateCallbacks);
event.setVersionTag(holder.getVersionTag());
event.setFromServer(true); // fix for bug 39358
if (clientEvent != null && clientEvent.getVersionTag() == null) {
clientEvent.setVersionTag(holder.getVersionTag());
}
}
return event;
}
private long findUsingSearchLoad(TXStateInterface txState, Object localValue,
EntryEventImpl clientEvent, final KeyInfo keyInfo, EntryEventImpl event, boolean preferCD) {
long lastModified = 0L;
// If this event is because of a register interest call, don't invoke the CacheLoader
boolean getForRegisterInterest = clientEvent != null && clientEvent.getOperation() != null
&& clientEvent.getOperation().isGetForRegisterInterest();
if (!getForRegisterInterest) {
SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
try {
processor.initialize(this, keyInfo.getKey(), keyInfo.getCallbackArg());
// processor fills in event
processor.doSearchAndLoad(event, txState, localValue, preferCD);
if (clientEvent != null && clientEvent.getVersionTag() == null) {
clientEvent.setVersionTag(event.getVersionTag());
}
lastModified = processor.getLastModified();
} finally {
processor.release();
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("DistributedRegion.findObjectInSystem skipping loader for region="
+ getFullPath() + "; key=" + keyInfo.getKey());
}
}
return lastModified;
}
/**
* @return true if cacheWrite was performed
*/
@Override
public boolean cacheWriteBeforeDestroy(EntryEventImpl event, Object expectedOldValue)
throws CacheWriterException, EntryNotFoundException, TimeoutException {
boolean result = false;
if (event.isDistributed()) {
CacheWriter localWriter = basicGetWriter();
Set netWriteRecipients = localWriter == null ? distAdvisor.adviseNetWrite() : null;
if ((localWriter != null || (netWriteRecipients != null && !netWriteRecipients.isEmpty()))
&& !event.inhibitAllNotifications()) {
final long start = getCachePerfStats().startCacheWriterCall();
try {
event.setOldValueFromRegion();
SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
try {
processor.initialize(this, event.getKey(), null);
processor.doNetWrite(event, netWriteRecipients, localWriter,
SearchLoadAndWriteProcessor.BEFOREDESTROY);
result = true;
} finally {
processor.release();
}
} finally {
getCachePerfStats().endCacheWriterCall(start);
}
}
serverDestroy(event, expectedOldValue);
}
return result;
}
@Override
boolean cacheWriteBeforeRegionDestroy(RegionEventImpl event)
throws CacheWriterException, TimeoutException {
boolean result = false;
if (event.getOperation().isDistributed()) {
CacheWriter localWriter = basicGetWriter();
Set netWriteRecipients = localWriter == null ? distAdvisor.adviseNetWrite() : null;
if (localWriter != null || netWriteRecipients != null && !netWriteRecipients.isEmpty()) {
final long start = getCachePerfStats().startCacheWriterCall();
try {
SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
try {
processor.initialize(this, "preDestroyRegion", null);
processor.doNetWrite(event, netWriteRecipients, localWriter,
SearchLoadAndWriteProcessor.BEFOREREGIONDESTROY);
result = true;
} finally {
processor.release();
}
} finally {
getCachePerfStats().endCacheWriterCall(start);
}
}
serverRegionDestroy(event);
}
return result;
}
protected void distributedRegionCleanup(RegionEventImpl event) {
if (event == null || event.getOperation() != Operation.REGION_REINITIALIZE) {
// only perform this if reinitialize is not due to resumption
// (REGION_REINITIALIZE)
// or if event is null then this was a failed initialize (create)
// wake up any threads in waitForRequiredRoles... they will checkReadiness
synchronized (missingRequiredRoles) {
missingRequiredRoles.notifyAll();
}
}
if (persistenceAdvisor != null) {
persistenceAdvisor.close(); // fix for bug 41094
}
distAdvisor.close();
// Fix for bug 46338. Wait for in progress clears before destroying the
// lock service, because destroying the service immediately releases the dlock
waitForInProgressClear();
DLockService dls = null;
synchronized (dlockMonitor) {
if (dlockService != null) {
dls = (DLockService) dlockService;
}
}
if (dls != null) {
try {
dls.destroyAndRemove();
} catch (CancelException e) {
// bug 37118
if (logger.isDebugEnabled()) {
logger.debug("DLS destroy abridged due to shutdown", e);
}
} catch (Exception ex) {
logger.warn("DLS destroy may have failed for " +
getFullPath(),
ex);
}
}
waitForCurrentOperations();
}
private void waitForCurrentOperations() {
// Fix for #48066 - make sure that region operations are completely
// distributed to peers before destroying the region.
boolean flushOnClose =
!Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "no-flush-on-close"); // test hook
if (!cache.forcedDisconnect() && flushOnClose
&& getDistributionManager().getMembershipManager() != null
&& getDistributionManager().getMembershipManager().isConnected()) {
getDistributionAdvisor().forceNewMembershipVersion();
try {
getDistributionAdvisor().waitForCurrentOperations();
} catch (Exception e) {
// log this but try to close the region so that listeners are invoked
logger.warn(String.format("%s: error closing region %s", this, getFullPath()), e);
}
}
}
/**
* In addition to inherited code this method also invokes RegionMembershipListeners
*/
@Override
public void postCreateRegion() {
super.postCreateRegion();
// should we sync on this.distAdvisor first to prevent bug 44369?
synchronized (advisorListener) {
Set<InternalDistributedMember> others = advisorListener.getInitialMembers();
CacheListener[] listeners = fetchCacheListenersField();
if (listeners != null) {
for (CacheListener listener : listeners) {
if (listener instanceof RegionMembershipListener) {
RegionMembershipListener regionMembershipListener = (RegionMembershipListener) listener;
try {
DistributedMember[] otherDms = others.toArray(new DistributedMember[0]);
regionMembershipListener.initialMembers(this, otherDms);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.error("Exception occurred in RegionMembershipListener",
t);
}
}
}
}
}
}
/**
* Free resources held by this region. This method is invoked after isDestroyed has been set to
* true.
*/
@Override
protected void postDestroyRegion(boolean destroyDiskRegion, RegionEventImpl event) {
distributedRegionCleanup(event);
try {
super.postDestroyRegion(destroyDiskRegion, event);
} catch (CancelException e) {
// I don't think this should ever happens: bulletproofing for bug 39454
logger.warn("postDestroyRegion: encountered cancellation", e);
}
}
@Override
public void cleanupFailedInitialization() {
super.cleanupFailedInitialization();
try {
RegionEventImpl ev = new RegionEventImpl(this, Operation.REGION_CLOSE, null, false, getMyId(),
generateEventID());
distributeDestroyRegion(ev, true);
distributedRegionCleanup(null);
} catch (RegionDestroyedException ignore) {
// someone else must have concurrently destroyed the region (maybe a distributed destroy)
} catch (CancelException ignore) {
// cache or DS is closed, ignore
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable t) {
logger.warn("Error cleaning up after failed region initialization of region " + this,
t);
}
}
@Override
public void handleCacheClose(Operation operation) {
try {
super.handleCacheClose(operation);
} finally {
distributedRegionCleanup(null);
}
}
/**
* invoke a cache writer before a put is performed elsewhere
*/
@Override
public void cacheWriteBeforePut(EntryEventImpl event, Set netWriteRecipients,
CacheWriter localWriter, boolean requireOldValue, Object expectedOldValue)
throws CacheWriterException, TimeoutException {
if ((localWriter != null || (netWriteRecipients != null && !netWriteRecipients.isEmpty()))
&& !event.inhibitAllNotifications()) {
final boolean isNewKey = event.getOperation().isCreate();
final long start = getCachePerfStats().startCacheWriterCall();
try {
SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
processor.initialize(this, "preUpdate", null);
try {
if (!isNewKey) {
processor.doNetWrite(event, netWriteRecipients, localWriter,
SearchLoadAndWriteProcessor.BEFOREUPDATE);
} else {
processor.doNetWrite(event, netWriteRecipients, localWriter,
SearchLoadAndWriteProcessor.BEFORECREATE);
}
} finally {
processor.release();
}
} finally {
getCachePerfStats().endCacheWriterCall(start);
}
}
serverPut(event, requireOldValue, expectedOldValue);
}
@Override
protected void cacheListenersChanged(boolean nowHasListener) {
if (nowHasListener) {
advisorListener.initRMLWrappers();
}
new UpdateAttributesProcessor(this).distribute();
}
@Override
protected void cacheWriterChanged(CacheWriter oldWriter) {
super.cacheWriterChanged(oldWriter);
if (oldWriter == null ^ basicGetWriter() == null) {
new UpdateAttributesProcessor(this).distribute();
}
}
@Override
protected void cacheLoaderChanged(CacheLoader oldLoader) {
super.cacheLoaderChanged(oldLoader);
if (oldLoader == null ^ basicGetLoader() == null) {
new UpdateAttributesProcessor(this).distribute();
}
}
@Override
public void addGatewaySenderId(String gatewaySenderId) {
super.addGatewaySenderId(gatewaySenderId);
new UpdateAttributesProcessor(this).distribute();
updateSenderIdMonitor();
}
@Override
public void removeGatewaySenderId(String gatewaySenderId) {
super.removeGatewaySenderId(gatewaySenderId);
new UpdateAttributesProcessor(this).distribute();
updateSenderIdMonitor();
}
@Override
public void addAsyncEventQueueId(String asyncEventQueueId) {
super.addAsyncEventQueueId(asyncEventQueueId);
new UpdateAttributesProcessor(this).distribute();
updateSenderIdMonitor();
}
@Override
public void removeAsyncEventQueueId(String asyncEventQueueId) {
super.removeAsyncEventQueueId(asyncEventQueueId);
new UpdateAttributesProcessor(this).distribute();
updateSenderIdMonitor();
}
SenderIdMonitor createSenderIdMonitor() {
return SenderIdMonitor.createSenderIdMonitor(this, this.distAdvisor);
}
void updateSenderIdMonitor() {
this.senderIdMonitor.update();
}
@Override
void checkSameSenderIdsAvailableOnAllNodes() {
this.senderIdMonitor.checkSenderIds();
}
/**
* Wraps call to dlock service in order to throw RegionDestroyedException if dlock service throws
* IllegalStateException and isDestroyed is true.
*/
private boolean isLockingSuspendedByCurrentThread() {
try {
return getLockService().isLockingSuspendedByCurrentThread();
} catch (IllegalStateException e) {
lockCheckReadiness();
throw e;
}
}
/**
* If this region's scope is GLOBAL, get a distributed lock on the given key, and return the Lock.
* The sender is responsible for unlocking.
*
* @return the acquired Lock if the region is GLOBAL, otherwise null.
*
* @throws NullPointerException if key is null
*/
private Lock getDistributedLockIfGlobal(Object key) throws TimeoutException {
if (getScope().isGlobal()) {
if (isLockingSuspendedByCurrentThread())
return null;
long start = System.currentTimeMillis();
long timeLeft = getCache().getLockTimeout();
long lockTimeout = timeLeft;
String msg = null;
Object[] msgArgs = null;
while (timeLeft > 0 || lockTimeout == -1) {
cache.getCancelCriterion().checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
Lock dlock = getDistributedLock(key);
if (!dlock.tryLock(timeLeft, TimeUnit.SECONDS)) {
msg = "Attempt to acquire distributed lock for %s failed after waiting %s seconds.";
msgArgs = new Object[] {key, (System.currentTimeMillis() - start) / 1000L};
break;
}
return dlock;
} catch (InterruptedException ex) {
interrupted = true;
cache.getCancelCriterion().checkCancelInProgress(ex);
// TODO: Why is it OK to keep going?
if (lockTimeout > -1) {
timeLeft = getCache().getLockTimeout() - (System.currentTimeMillis() - start) / 1000L;
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // while
if (msg == null) {
msg = "Timed out after waiting %s seconds for the distributed lock for %s.";
msgArgs = new Object[] {getCache().getLockTimeout(), key};
}
throw new TimeoutException(String.format(msg, msgArgs));
} else {
return null;
}
}
/**
* Checks if the entry is a valid entry
*
* @return true if entry not null or entry is not removed
*/
protected boolean checkEntryNotValid(RegionEntry mapEntry) {
return mapEntry == null || mapEntry.isRemoved() && !mapEntry.isTombstone();
}
/**
* Get the best iterator for iterating over the contents of this region. This method will either
* an iterator that uses hash ordering from the entry map, or, in the case of an overflow region,
* an iterator that iterates over the entries in disk order.
*/
@Override
public Iterator<RegionEntry> getBestIterator(boolean includeValues) {
DiskRegion dr = getDiskRegion();
if (DiskPage.DISK_PAGE_SIZE > 0 && includeValues && dr != null) {
// Wait for the disk region to recover values first.
dr.waitForAsyncRecovery();
if (dr.getNumOverflowOnDisk() > 0) {
return new DiskSavvyIterator();
}
}
return entries.regionEntries().iterator();
}
/**
* Should only be used if this region has entries on disk that are not in memory. This currently
* happens for overflow and for recovery when values are not recovered. The first iteration does a
* normal iteration of the regionEntries. But if it finds an entry that is currently only on disk
* it saves it in a list sorted by the location on disk. Once the regionEntries iterator has
* nothing more to iterate it starts iterating over, in disk order, the entries on disk.
*/
private class DiskSavvyIterator implements Iterator<RegionEntry> {
private boolean usingIt = true;
private Iterator<?> it = entries.regionEntries().iterator();
// iterator for nested ArrayLists
private Iterator<RegionEntry> subIt = null;
private final TreeMap<DiskPage, Object> diskMap = new TreeMap<>();
DiskSavvyIterator() {}
@Override
public boolean hasNext() {
boolean result;
if (subIt != null) {
result = subIt.hasNext();
if (!result) {
subIt = null;
} else {
return result;
}
}
result = it.hasNext();
if (usingIt && !result) {
usingIt = false;
it = diskMap.values().iterator();
result = it.hasNext();
}
return result;
}
@Override
public RegionEntry next() {
for (;;) {
if (subIt != null) {
return subIt.next();
}
if (usingIt) {
RegionEntry regionEntry = (RegionEntry) it.next();
DiskPosition diskPosition = new DiskPosition();
if (regionEntry.isOverflowedToDisk(DistributedRegion.this, diskPosition)) {
// add diskPosition to sorted list
DiskPage dPage = new DiskPage(diskPosition);
Object value = diskMap.get(dPage);
if (value == null) {
diskMap.put(dPage, regionEntry);
} else if (value instanceof ArrayList) {
List<Object> list = (ArrayList<Object>) value;
list.add(regionEntry);
} else {
List<Object> list = new ArrayList<>();
list.add(value);
list.add(regionEntry);
diskMap.put(dPage, list);
}
if (!hasNext()) {
throw new NoSuchElementException();
}
} else {
return regionEntry;
}
} else {
Object value = it.next();
if (value instanceof ArrayList) {
List<RegionEntry> list = (List<RegionEntry>) value;
subIt = list.iterator();
return subIt.next();
} else {
return (RegionEntry) value;
}
}
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
public static class DiskPosition implements Comparable<DiskPosition> {
long oplogId; // package-private to avoid synthetic accessor
long offset; // package-private to avoid synthetic accessor
DiskPosition() {}
public void setPosition(long oplogId, long offset) {
this.oplogId = oplogId;
this.offset = offset;
}
@Override
public int hashCode() {
// TODO: Object instantiation inside 'hashCode()' is bad
return Long.valueOf(oplogId ^ offset).hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof DiskPosition) {
DiskPosition other = (DiskPosition) obj;
return oplogId == other.oplogId && offset == other.offset;
} else {
return false;
}
}
@Override
public int compareTo(DiskPosition o) {
int result = Long.signum(oplogId - o.oplogId);
if (result == 0) {
result = Long.signum(offset - o.offset);
}
return result;
}
@Override
public String toString() {
return "<" + oplogId + ':' + offset + '>';
}
}
static class DiskPage extends DiskPosition {
static final long DISK_PAGE_SIZE =
Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "DISK_PAGE_SIZE", 8 << 10);
DiskPage(DiskPosition diskPosition) {
setPosition(diskPosition.oplogId, diskPosition.offset / DISK_PAGE_SIZE);
}
}
/**
* Returns the lock lease value to use for DistributedLock and RegionDistributedLock. -1 is
* supported as non-expiring lock.
*/
private long getLockLeaseForLock() { // package-private to avoid synthetic accessor
if (getCache().getLockLease() == -1) {
return -1;
}
return (getCache().getLockLease()) * 1000L;
}
/**
* Returns the lock timeout value to use for DistributedLock and RegionDistributedLock. -1 is
* supported as a lock that never times out.
*/
private long getLockTimeoutForLock(long time, TimeUnit unit) { // package-private to avoid
// synthetic
// accessor
if (time == -1) {
return -1;
}
return TimeUnit.MILLISECONDS.convert(time, unit);
}
private class DistributedLock implements Lock {
private final Object key;
DistributedLock(Object key) {
this.key = key;
}
@Override
public void lock() {
try {
boolean locked = basicTryLock(-1, TimeUnit.MILLISECONDS, false);
if (!locked) {
lockCheckReadiness();
}
Assert.assertTrue(locked, "Failed to acquire DistributedLock");
} catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
lockCheckReadiness();
Assert.assertTrue(false, "Failed to acquire DistributedLock");
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
try {
boolean locked = basicTryLock(-1, TimeUnit.MILLISECONDS, true);
if (!locked) {
lockCheckReadiness();
}
Assert.assertTrue(locked, "Failed to acquire DistributedLock");
} catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
}
}
@Override
public boolean tryLock() {
try {
ReplyProcessor21.forceSevereAlertProcessing();
return getLockService().lock(key, 0, getLockLeaseForLock());
} catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
} finally {
ReplyProcessor21.unforceSevereAlertProcessing();
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return basicTryLock(time, unit, true);
}
// TODO: basicTryLock method is too complex for IDE to analyze
private boolean basicTryLock(long time, TimeUnit unit, boolean interruptible)
throws InterruptedException {
// if (Thread.interrupted()) throw new InterruptedException(); not necessary lockInterruptibly
// does this
final DistributionManager dm = getDistributionManager();
long start = System.currentTimeMillis();
long timeoutMS = getLockTimeoutForLock(time, unit);
long end;
if (timeoutMS < 0) {
timeoutMS = Long.MAX_VALUE;
end = Long.MAX_VALUE;
} else {
end = start + timeoutMS;
}
long ackSAThreshold = getSystem().getConfig().getAckSevereAlertThreshold() * 1000L;
long waitInterval;
long ackWaitThreshold;
if (ackSAThreshold > 0) {
ackWaitThreshold = getSystem().getConfig().getAckWaitThreshold() * 1000L;
waitInterval = ackWaitThreshold;
} else {
waitInterval = timeoutMS;
ackWaitThreshold = 0;
}
boolean suspected = false;
boolean severeAlertIssued = false;
DistributedMember lockHolder = null;
do {
try {
waitInterval = Math.min(end - System.currentTimeMillis(), waitInterval);
ReplyProcessor21.forceSevereAlertProcessing();
final boolean gotLock;
if (interruptible) {
gotLock =
getLockService().lockInterruptibly(key, waitInterval, getLockLeaseForLock());
} else {
gotLock = getLockService().lock(key, waitInterval, getLockLeaseForLock());
}
if (gotLock) {
return true;
}
if (ackSAThreshold > 0) {
long elapsed = System.currentTimeMillis() - start;
if (elapsed > ackWaitThreshold) {
if (!suspected) {
// start suspect processing on the holder of the lock
suspected = true;
severeAlertIssued = false; // in case this is a new lock holder
waitInterval = ackSAThreshold;
DLockRemoteToken remoteToken = ((DLockService) getLockService()).queryLock(key);
lockHolder = remoteToken.getLessee();
if (lockHolder != null) {
dm.getMembershipManager().suspectMember((InternalDistributedMember) lockHolder,
"Has not released a global region entry lock in over "
+ ackWaitThreshold / 1000 + " seconds");
}
} else if (elapsed > ackSAThreshold) {
DLockRemoteToken remoteToken = ((DLockService) getLockService()).queryLock(key);
if (lockHolder != null && remoteToken.getLessee() != null
&& lockHolder.equals(remoteToken.getLessee())) {
if (!severeAlertIssued) {
severeAlertIssued = true;
logger.fatal(
"{} seconds have elapsed waiting for global region entry lock held by {}",
ackWaitThreshold + ackSAThreshold, lockHolder);
}
} else {
// the lock holder has changed
suspected = false;
waitInterval = ackWaitThreshold;
lockHolder = null;
}
}
}
} // ackSAThreshold processing
} catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
} finally {
ReplyProcessor21.unforceSevereAlertProcessing();
}
} while (System.currentTimeMillis() < end);
return false;
}
@Override
public void unlock() {
try {
ReplyProcessor21.forceSevereAlertProcessing();
getLockService().unlock(key);
if (!entries.containsKey(key)) {
getLockService().freeResources(key);
}
} catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
} finally {
ReplyProcessor21.unforceSevereAlertProcessing();
}
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException(
"newCondition unsupported");
}
}
private class RegionDistributedLock implements Lock {
RegionDistributedLock() {}
@Override
public void lock() {
try {
boolean locked = getLockService().suspendLocking(-1);
Assert.assertTrue(locked, "Failed to acquire RegionDistributedLock");
} catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
// suspendLockingInterruptibly checks Thread.interrupted() to throw InterruptedException
try {
boolean locked = getLockService().suspendLockingInterruptibly(-1);
Assert.assertTrue(locked, "Failed to acquire RegionDistributedLock");
} catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
}
}
@Override
public boolean tryLock() {
try {
return getLockService().suspendLocking(0);
} catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// suspendLockingInterruptibly checks Thread.interrupted() to throw InterruptedException
try {
return getLockService().suspendLockingInterruptibly(getLockTimeoutForLock(time, unit));
} catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
}
}
@Override
public void unlock() {
try {
getLockService().resumeLocking();
} catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
}
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException("newCondition unsupported");
}
}
/**
* If this region's scope is GLOBAL, get the region distributed lock. The sender is responsible
* for unlocking.
*
* @return the acquired Lock if the region is GLOBAL and not already suspend, otherwise null.
*/
private Lock getRegionDistributedLockIfGlobal() throws TimeoutException {
if (getScope().isGlobal()) {
if (isLockingSuspendedByCurrentThread())
return null;
Lock dlock = getRegionDistributedLock();
dlock.lock(); // caller is expected to use a try-finally to unlock
return dlock;
}
return null;
}
/**
* Distribute the PutAllOp. This implementation distributes it to peers.
*
* @return token >0 means startOperation finished distribution
* @since GemFire 5.7
*/
@Override
public long postPutAllSend(DistributedPutAllOperation putAllOp,
VersionedObjectList successfulPuts) {
long token = -1;
if (putAllOp.putAllDataSize > 0) {
token = putAllOp.startOperation();
} else {
if (logger.isDebugEnabled()) {
logger.debug("DR.postPutAll: no data to distribute");
}
}
return token;
}
@Override
public long postRemoveAllSend(DistributedRemoveAllOperation op,
VersionedObjectList successfulOps) {
long token = -1;
if (op.removeAllDataSize > 0) {
token = op.startOperation();
} else {
getCache().getLogger().fine("DR.postRemoveAll: no data to distribute");
}
return token;
}
@Override
VersionedObjectList basicPutAll(final Map<?, ?> map,
final DistributedPutAllOperation putAllOp, final Map<Object, VersionTag> retryVersions) {
Lock dlock = getRegionDistributedLockIfGlobal();
try {
return super.basicPutAll(map, putAllOp, retryVersions);
} finally {
if (dlock != null) {
dlock.unlock();
}
}
}
@Override
public VersionedObjectList basicRemoveAll(final Collection<Object> keys,
final DistributedRemoveAllOperation removeAllOp, final List<VersionTag> retryVersions) {
Lock dlock = getRegionDistributedLockIfGlobal();
try {
return super.basicRemoveAll(keys, removeAllOp, retryVersions);
} finally {
if (dlock != null) {
dlock.unlock();
}
}
}
/**
* Returns the missing required roles after waiting up to the timeout
*
* @throws IllegalStateException if region is not configured with required roles
*/
public Set waitForRequiredRoles(long timeout) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
checkReadiness();
if (!getMembershipAttributes().hasRequiredRoles()) {
throw new IllegalStateException(
"Region has not been configured with required roles.");
}
if (!isMissingRequiredRoles) { // should we delete this check?
if (logger.isDebugEnabled()) {
logger.debug("No missing required roles to wait for.");
}
return Collections.emptySet(); // early-out: no missing required roles
}
if (timeout != 0) { // if timeout is zero then fall through past waits
if (timeout == -1) { // infinite timeout
while (isMissingRequiredRoles) {
checkReadiness();
// bail if distribution has stopped
cache.getCancelCriterion().checkCancelInProgress(null);
synchronized (missingRequiredRoles) {
// one more check while synced
if (isMissingRequiredRoles) {
if (logger.isDebugEnabled()) {
logger.debug("About to wait for missing required roles.");
}
// TODO an infinite wait here might be a problem...
missingRequiredRoles.wait(); // spurious wakeup ok
}
}
}
} else { // use the timeout
long endTime = System.currentTimeMillis() + timeout;
while (isMissingRequiredRoles) {
checkReadiness();
// bail if distribution has stopped
cache.getCancelCriterion().checkCancelInProgress(null);
synchronized (missingRequiredRoles) {
// one more check while synced
if (isMissingRequiredRoles) {
long timeToWait = endTime - System.currentTimeMillis();
if (timeToWait > 0) {
if (logger.isDebugEnabled()) {
logger.debug("About to wait up to {} milliseconds for missing required roles.",
timeToWait);
}
missingRequiredRoles.wait(timeToWait); // spurious wakeup ok
} else {
break;
}
}
}
}
}
}
// check readiness again: thread may have been notified at destroy time
checkReadiness();
if (isMissingRequiredRoles) {
// sync on missingRequiredRoles to prevent mods to required role status...
synchronized (missingRequiredRoles) {
return Collections.unmodifiableSet(new HashSet<>(missingRequiredRoles));
}
} else {
return Collections.emptySet();
}
}
/** Returns true if the role is currently present this region's membership. */
public boolean isRoleInRegionMembership(Role role) {
checkReadiness();
return basicIsRoleInRegionMembership(role);
}
private boolean basicIsRoleInRegionMembership(Role role) {
if (getSystem().getDistributedMember().getRoles().contains(role)) {
// since we are playing the role
return true;
}
for (DistributedMember member : distAdvisor.adviseGeneric()) {
Set<Role> roles = member.getRoles();
if (roles.contains(role)) {
return true;
}
}
return false;
}
@Override
public void remoteRegionInitialized(CacheProfile profile) {
synchronized (advisorListener) {
if (advisorListener.members == null && hasListener()) {
Object callback = TEST_HOOK_ADD_PROFILE ? profile : null;
RegionEventImpl event = new RegionEventImpl(this, Operation.REGION_CREATE, callback, true,
profile.peerMemberId);
dispatchListenerEvent(EnumListenerEvent.AFTER_REMOTE_REGION_CREATE, event);
}
}
}
@Override
void removeSenderFromAdvisor(InternalDistributedMember sender, int serial,
boolean regionDestroyed) {
getDistributionAdvisor().removeIdWithSerial(sender, serial, regionDestroyed);
}
/** doesn't throw RegionDestroyedException, used by CacheDistributionAdvisor */
@Override
public DistributionAdvisee getParentAdvisee() {
return (DistributionAdvisee) basicGetParentRegion();
}
/**
* Used to get membership events from our advisor to implement RegionMembershipListener
* invocations.
*
* @since GemFire 5.0
*/
protected class AdvisorListener implements MembershipListener {
Set<InternalDistributedMember> members = new HashSet<>(); // package-private to avoid synthetic
// accessor
protected boolean destroyed = false;
synchronized void addMembers(Set<InternalDistributedMember> newMembers) {
members.addAll(newMembers);
}
protected synchronized Set<InternalDistributedMember> getInitialMembers() {
Set<InternalDistributedMember> initMembers = members;
members = null;
return initMembers;
}
@Override
public void quorumLost(DistributionManager distributionManager,
Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {
// do nothing
}
@Override
public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id,
InternalDistributedMember whoSuspected, String reason) {
// do nothing
}
/** called when membership listeners are added after region creation */
synchronized void initRMLWrappers() {
Set<InternalDistributedMember> membersWithThisRegion =
distAdvisor.adviseGeneric();
initPostCreateRegionMembershipListeners(membersWithThisRegion);
}
@Override
public synchronized void memberJoined(DistributionManager distributionManager,
InternalDistributedMember id) {
if (destroyed) {
return;
}
if (members != null) {
members.add(id);
}
// bug #44684 - do not notify listener of create until remote member is initialized
if (getMembershipAttributes().hasRequiredRoles()) {
// newlyAcquiredRoles is used for intersection and RoleEvent
Set<Role> newlyAcquiredRoles = Collections.emptySet();
synchronized (missingRequiredRoles) {
if (isMissingRequiredRoles) {
Set<Role> roles = id.getRoles();
newlyAcquiredRoles = new HashSet<>(missingRequiredRoles);
newlyAcquiredRoles.retainAll(roles); // find the intersection
if (!newlyAcquiredRoles.isEmpty()) {
missingRequiredRoles.removeAll(newlyAcquiredRoles);
if (members == null && missingRequiredRoles.isEmpty()) {
isMissingRequiredRoles = false;
getCachePerfStats().incReliableRegionsMissing(-1);
if (getMembershipAttributes().getLossAction().isAllAccess())
getCachePerfStats().incReliableRegionsMissingFullAccess(-1); // rahul
else if (getMembershipAttributes().getLossAction().isLimitedAccess())
getCachePerfStats().incReliableRegionsMissingLimitedAccess(-1);
else if (getMembershipAttributes().getLossAction().isNoAccess())
getCachePerfStats().incReliableRegionsMissingNoAccess(-1);
boolean async = resumeReliability(id, newlyAcquiredRoles);
if (async) {
destroyed = true;
}
}
}
}
if (!destroyed) {
// any number of threads may be waiting on missingRequiredRoles
missingRequiredRoles.notifyAll();
}
}
if (!destroyed && members == null && hasListener()) {
if (!newlyAcquiredRoles.isEmpty()) {
// fire afterRoleGain event
RoleEventImpl relEvent = new RoleEventImpl(DistributedRegion.this,
Operation.REGION_CREATE, null, true, id, newlyAcquiredRoles);
dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_GAIN, relEvent);
}
}
}
}
@Override
public synchronized void memberDeparted(DistributionManager distributionManager,
InternalDistributedMember id, boolean crashed) {
if (destroyed) {
return;
}
if (members != null) {
members.remove(id);
}
if (members == null && hasListener()) {
RegionEventImpl event =
new RegionEventImpl(DistributedRegion.this, Operation.REGION_CLOSE, null, true, id);
if (crashed) {
dispatchListenerEvent(EnumListenerEvent.AFTER_REMOTE_REGION_CRASH, event);
} else {
// @todo darrel: it would be nice to know if what actual op was done
// could be close, local destroy, or destroy (or load snap?)
if (DestroyRegionOperation.isRegionDepartureNotificationOk()) {
dispatchListenerEvent(EnumListenerEvent.AFTER_REMOTE_REGION_DEPARTURE, event);
}
}
}
if (getMembershipAttributes().hasRequiredRoles()) {
Set<Role> newlyMissingRoles = Collections.emptySet();
synchronized (missingRequiredRoles) {
Set<Role> roles = id.getRoles();
for (Role role : roles) {
if (getMembershipAttributes().getRequiredRoles().contains(role)
&& !basicIsRoleInRegionMembership(role)) {
if (newlyMissingRoles.isEmpty()) {
newlyMissingRoles = new HashSet<>();
}
newlyMissingRoles.add(role);
if (members == null && !isMissingRequiredRoles) {
isMissingRequiredRoles = true;
getCachePerfStats().incReliableRegionsMissing(1);
if (getMembershipAttributes().getLossAction().isAllAccess()) {
getCachePerfStats().incReliableRegionsMissingFullAccess(1); // rahul
} else if (getMembershipAttributes().getLossAction().isLimitedAccess()) {
getCachePerfStats().incReliableRegionsMissingLimitedAccess(1);
} else if (getMembershipAttributes().getLossAction().isNoAccess()) {
getCachePerfStats().incReliableRegionsMissingNoAccess(1);
}
boolean async = lostReliability(id, newlyMissingRoles);
if (async) {
destroyed = true;
}
}
}
}
if (!destroyed) {
missingRequiredRoles.addAll(newlyMissingRoles);
// any number of threads may be waiting on missingRequiredRoles...
missingRequiredRoles.notifyAll();
}
}
if (!destroyed && members == null && hasListener()) {
if (!newlyMissingRoles.isEmpty()) {
// fire afterRoleLoss event
RoleEventImpl relEvent = new RoleEventImpl(DistributedRegion.this,
Operation.REGION_CLOSE, null, true, id, newlyMissingRoles);
dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_LOSS, relEvent);
}
}
}
}
}
/**
* Used to bootstrap txState.
*
* @return member with primary bucket for partitionedRegions
*/
@Override
public DistributedMember getOwnerForKey(KeyInfo key) {
assert !isInternalRegion() || isMetaRegionWithTransactions();
if (!getAttributes().getDataPolicy().withStorage() || (getConcurrencyChecksEnabled()
&& getAttributes().getDataPolicy() == DataPolicy.NORMAL)) {
// execute on random replicate
return getRandomReplicate();
}
// if we are non-persistent, forward transactions to
// a persistent member
if (getConcurrencyChecksEnabled() && !generateVersionTag) {
return getRandomPersistentReplicate();
}
return super.getOwnerForKey(key);
}
/**
* Execute the provided named function in all locations that contain the given keys. So function
* can be executed on just one fabric node, executed in parallel on a subset of nodes in parallel
* across all the nodes.
*
* @since GemFire 5.8
*/
@Override
public ResultCollector executeFunction(final DistributedRegionFunctionExecutor execution,
final Function function, final Object args, final ResultCollector rc, final Set filter,
final ServerToClientFunctionResultSender sender) {
DistributedMember target = getTransactionalNode();
if (target != null) {
if (target.equals(getMyId())) {
return executeLocally(execution, function, args, 0, rc, filter, sender);
}
return executeOnReplicate(execution, function, args, rc, filter, target);
} else if (getAttributes().getDataPolicy().withReplication()
|| getAttributes().getDataPolicy().withPreloaded()) {
// execute locally
final Set<InternalDistributedMember> singleMember = Collections.singleton(getMyId());
execution.validateExecution(function, singleMember);
execution.setExecutionNodes(singleMember);
return executeLocally(execution, function, args, 0, rc, filter, sender);
} else {
// select a random replicate
target = getRandomReplicate();
if (target == null) {
throw new FunctionException(
String.format("No Replicated Region found for executing function : %s.",
function.getId()));
}
}
final LocalResultCollector<?, ?> localRC = execution.getLocalResultCollector(function, rc);
return executeOnReplicate(execution, function, args, localRC, filter, target);
}
private ResultCollector executeOnReplicate(final DistributedRegionFunctionExecutor execution,
final Function function, final Object args, ResultCollector rc, final Set filter,
final DistributedMember target) {
final Set<InternalDistributedMember> singleMember =
Collections.singleton((InternalDistributedMember) target);
execution.validateExecution(function, singleMember);
execution.setExecutionNodes(singleMember);
HashMap<InternalDistributedMember, Object> memberArgs = new HashMap<>();
memberArgs.put((InternalDistributedMember) target,
execution.getArgumentsForMember(target.getId()));
ResultSender resultSender = new DistributedRegionFunctionResultSender(null, rc, function,
execution.getServerResultSender());
DistributedRegionFunctionResultWaiter waiter =
new DistributedRegionFunctionResultWaiter(getSystem(), getFullPath(), rc,
function, filter, Collections.singleton(target), memberArgs, resultSender);
rc = waiter.getFunctionResultFrom(Collections.singleton(target), function, execution);
return rc;
}
/**
* @return the node which a transaction is already is progress, null otherwise
*/
private DistributedMember getTransactionalNode() {
if (cache.getTxManager().getTXState() != null) {
return cache.getTxManager().getTXState().getTarget();
}
return null;
}
/**
* Implementation of {@link ProfileVisitor} that selects a random replicated member from the
* available ones for this region.
*/
static class GetRandomReplicate implements ProfileVisitor<DistributedMember> {
private boolean onlyPersistent = false;
InternalDistributedMember member = null;
private int randIndex = -1;
GetRandomReplicate() {}
GetRandomReplicate(boolean onlyPersistent) {
this.onlyPersistent = onlyPersistent;
}
@Override
public boolean visit(DistributionAdvisor advisor, Profile profile, int profileIndex,
int numProfiles, DistributedMember member) {
final CacheProfile cp = (CacheProfile) profile;
if (randIndex < 0) {
randIndex = PartitionedRegion.RANDOM.nextInt(numProfiles);
}
if (cp.dataPolicy.withReplication() && cp.regionInitialized) {
if (onlyPersistent && !cp.dataPolicy.withPersistence()) {
return true;
}
// store the last replicated member in any case since in the worst case
// there may be no replicated node after "randIndex" in which case the
// last visited member will be used
this.member = cp.getDistributedMember();
return profileIndex < randIndex;
}
return true;
}
}
/**
* @return a random replicate, null if there are none
*/
private InternalDistributedMember getRandomReplicate() {
/*
* [sumedh] The old code causes creation of a unnecessary HashSet and population with all
* replicates (which may be large), then copy into an array and then selection of a random one
* from that. The new approach uses a much more efficient visitor instead. Set replicates =
* this.getCacheDistributionAdvisor().adviseReplicates(); if (replicates.isEmpty()) { return
* null; } return (InternalDistributedMember)(replicates .toArray()[new
* Random().nextInt(replicates.size())]);
*/
final GetRandomReplicate getReplicate = new GetRandomReplicate();
getCacheDistributionAdvisor().accept(getReplicate, null);
return getReplicate.member;
}
/**
* @return a random persistent replicate, null if there is none
*/
private InternalDistributedMember getRandomPersistentReplicate() {
final GetRandomReplicate getPersistentReplicate = new GetRandomReplicate(true);
getCacheDistributionAdvisor().accept(getPersistentReplicate, null);
return getPersistentReplicate.member;
}
void executeOnRegion(DistributedRegionFunctionStreamingMessage msg, final Function function,
final Object args, int prid, final Set filter, boolean isReExecute) throws IOException {
final DistributionManager dm = getDistributionManager();
ResultSender resultSender = new DistributedRegionFunctionResultSender(dm, msg, function);
final RegionFunctionContextImpl context = new RegionFunctionContextImpl(cache, function.getId(),
this, args, filter, null, null, resultSender, isReExecute);
FunctionStats stats = FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem());
long start = stats.startFunctionExecution(function.hasResult());
try {
function.execute(context);
stats.endFunctionExecution(start, function.hasResult());
} catch (FunctionException functionException) {
if (logger.isDebugEnabled()) {
logger.debug("FunctionException occurred on remote node while executing Function: {}",
function.getId(), functionException);
}
stats.endFunctionExecutionWithException(start, function.hasResult());
throw functionException;
} catch (CacheClosedException cacheClosedexception) {
if (logger.isDebugEnabled()) {
logger.debug("CacheClosedException occurred on remote node while executing Function: {}",
function.getId(), cacheClosedexception);
}
throw cacheClosedexception;
} catch (Exception exception) {
if (logger.isDebugEnabled()) {
logger.debug("Exception occurred on remote node while executing Function: {}",
function.getId(), exception);
}
stats.endFunctionExecutionWithException(start, function.hasResult());
throw new FunctionException(exception);
}
}
private ResultCollector executeLocally(final DistributedRegionFunctionExecutor execution,
final Function function, final Object args, int prid,
final ResultCollector rc,
final Set filter,
final ServerToClientFunctionResultSender sender) {
final LocalResultCollector<?, ?> localRC = execution.getLocalResultCollector(function, rc);
final DistributionManager dm = getDistributionManager();
final DistributedRegionFunctionResultSender resultSender =
new DistributedRegionFunctionResultSender(dm, localRC, function, sender);
final RegionFunctionContextImpl context = new RegionFunctionContextImpl(cache, function.getId(),
this, args, filter, null, null, resultSender, execution.isReExecute());
execution.executeFunctionOnLocalNode(function, context, resultSender, dm, isTX());
return localRC;
}
@Override
void setMemoryThresholdFlag(MemoryEvent event) {
Set<InternalDistributedMember> others = getCacheDistributionAdvisor().adviseGeneric();
if (event.isLocal() || others.contains(event.getMember())) {
if (event.getState().isCritical() && !event.getPreviousState().isCritical()
&& (event.getType() == ResourceType.HEAP_MEMORY
|| (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
addCriticalMember(event.getMember());
} else if (!event.getState().isCritical() && event.getPreviousState().isCritical()
&& (event.getType() == ResourceType.HEAP_MEMORY
|| (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
removeCriticalMember(event.getMember());
}
}
}
@Override
public void removeCriticalMember(DistributedMember member) {
if (logger.isDebugEnabled()) {
logger.debug("DR: removing member {} from critical member list", member);
}
synchronized (memoryThresholdReachedMembers) {
memoryThresholdReachedMembers.remove(member);
if (memoryThresholdReachedMembers.isEmpty()) {
setMemoryThresholdReached(false);
}
}
}
@Override
void initialCriticalMembers(boolean localMemoryIsCritical,
Set<InternalDistributedMember> criticalMembers) {
Set<InternalDistributedMember> others = getCacheDistributionAdvisor().adviseGeneric();
for (InternalDistributedMember idm : criticalMembers) {
if (others.contains(idm)) {
addCriticalMember(idm);
}
}
}
/**
* @param idm member whose threshold has been exceeded
*/
protected void addCriticalMember(final DistributedMember idm) {
synchronized (memoryThresholdReachedMembers) {
if (memoryThresholdReachedMembers.isEmpty()) {
setMemoryThresholdReached(true);
}
memoryThresholdReachedMembers.add(idm);
}
}
@Override
public MemoryThresholdInfo getAtomicThresholdInfo() {
if (!isMemoryThresholdReached()) {
return MemoryThresholdInfo.getNotReached();
}
synchronized (memoryThresholdReachedMembers) {
return new MemoryThresholdInfo(isMemoryThresholdReached(),
new HashSet<>(memoryThresholdReachedMembers));
}
}
/**
* Fetch Version for the given key from a remote replicate member.
*
* @throws EntryNotFoundException if the entry is not found on replicate member
* @return VersionTag for the key
*/
protected VersionTag fetchRemoteVersionTag(Object key) {
VersionTag tag = null;
assert getDataPolicy() != DataPolicy.REPLICATE;
final TXStateProxy tx = cache.getTXMgr().pauseTransaction();
try {
boolean retry = true;
InternalDistributedMember member = getRandomReplicate();
while (retry) {
try {
if (member == null) {
break;
}
FetchVersionResponse response = RemoteFetchVersionMessage.send(member, this, key);
tag = response.waitForResponse();
retry = false;
} catch (RemoteOperationException e) {
member = getRandomReplicate();
if (member != null) {
if (logger.isDebugEnabled()) {
logger.debug("Retrying RemoteFetchVersionMessage on member:{}", member);
}
}
}
}
} finally {
cache.getTXMgr().unpauseTransaction(tx);
}
return tag;
}
/**
* Test hook for bug 48578. Returns true if it sees a net loader. Returns false if it does not
* have one.
*/
public boolean hasNetLoader() {
return hasNetLoader(getCacheDistributionAdvisor());
}
@Override
long getLatestLastAccessTimeFromOthers(Object key) {
LatestLastAccessTimeOperation op = new LatestLastAccessTimeOperation<>(this, key);
return op.getLatestLastAccessTime();
}
@Override
public Set adviseNetWrite() {
return getCacheDistributionAdvisor().adviseNetWrite();
}
@VisibleForTesting
public SenderIdMonitor getSenderIdMonitor() {
return senderIdMonitor;
}
}