blob: 790dc4d36f20f93af3ff88b4129ad148e2aefabd [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_FILL_IN_VALUE;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.InternalGemFireError;
import com.gemstone.gemfire.InvalidDeltaException;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.CacheLoader;
import com.gemstone.gemfire.cache.CacheLoaderException;
import com.gemstone.gemfire.cache.CacheWriter;
import com.gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.DiskAccessException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.LossAction;
import com.gemstone.gemfire.cache.MembershipAttributes;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.RegionAccessException;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.RegionDistributionException;
import com.gemstone.gemfire.cache.RegionMembershipListener;
import com.gemstone.gemfire.cache.ResumptionAction;
import com.gemstone.gemfire.cache.RoleException;
import com.gemstone.gemfire.cache.TimeoutException;
import com.gemstone.gemfire.cache.TransactionId;
import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionException;
import com.gemstone.gemfire.cache.execute.ResultCollector;
import com.gemstone.gemfire.cache.execute.ResultSender;
import com.gemstone.gemfire.cache.persistence.PersistentReplicatesOfflineException;
import com.gemstone.gemfire.cache.query.internal.IndexUpdater;
import com.gemstone.gemfire.cache.wan.GatewaySender;
import com.gemstone.gemfire.distributed.DistributedLockService;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.LockServiceDestroyedException;
import com.gemstone.gemfire.distributed.Role;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.ProfileVisitor;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.MembershipListener;
import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
import com.gemstone.gemfire.distributed.internal.locks.DLockRemoteToken;
import com.gemstone.gemfire.distributed.internal.locks.DLockService;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile;
import com.gemstone.gemfire.internal.cache.InitialImageOperation.GIIStatus;
import com.gemstone.gemfire.internal.cache.RemoteFetchVersionMessage.FetchVersionResponse;
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionExecutor;
import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionResultSender;
import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionResultWaiter;
import com.gemstone.gemfire.internal.cache.execute.FunctionStats;
import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
import com.gemstone.gemfire.internal.cache.execute.RegionFunctionContextImpl;
import com.gemstone.gemfire.internal.cache.execute.ServerToClientFunctionResultSender;
import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
import com.gemstone.gemfire.internal.cache.persistence.CreatePersistentRegionProcessor;
import com.gemstone.gemfire.internal.cache.persistence.PersistenceAdvisor;
import com.gemstone.gemfire.internal.cache.persistence.PersistenceAdvisorImpl;
import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberID;
import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberManager;
import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberView;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueConfigurationException;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.Chunk;
import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.sequencelog.RegionLogger;
import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch;
import com.gemstone.org.jgroups.util.StringId;
/**
*
* @author Eric Zoerner
* @author Sudhir Menon
*/
@SuppressWarnings("deprecation")
public class DistributedRegion extends LocalRegion implements
CacheDistributionAdvisee
{
private static final Logger logger = LogService.getLogger();
/** causes cache profile to be added to afterRemoteRegionCreate notification for testing */
public static boolean TEST_HOOK_ADD_PROFILE = false;
/** Used to sync accesses to this.dlockService to allow lazy construction */
private final Object dlockMonitor = new Object();
final CacheDistributionAdvisor distAdvisor;
/**
* @guarded.By {@link #dlockMonitor}
*/
private DistributedLockService dlockService;
protected final AdvisorListener advisorListener = new AdvisorListener();
/** Set of currently missing required roles */
protected final HashSet missingRequiredRoles = new HashSet();
/** True if this region is currently missing any required roles */
protected volatile boolean isMissingRequiredRoles = false;
/**
* True if this region is has any required roles defined and the LossAction is
* either NO_ACCESS or LIMITED_ACCESS. Reliability checks will only happen if
* this is true.
*/
private final boolean requiresReliabilityCheck;
/**
* Provides a queue for reliable message delivery
*
* @since 5.0
*/
protected final ReliableMessageQueue rmq;
/**
* Latch that is opened after initialization waits for required roles up to
* the <a href="DistributedSystem#member-timeout">member-timeout </a>.
*/
protected final StoppableCountDownLatch initializationLatchAfterMemberTimeout;
private final PersistenceAdvisor persistenceAdvisor;
private final PersistentMemberID persistentId;
/**
* This boolean is set to false when this region
* is non-persistent, but there are persistent members in the distributed system
* to which all region modifications should be forwarded
* see bug 45186
*/
private volatile boolean generateVersionTag = true;
/** Tests can set this to true and ignore reliability triggered reconnects */
public static boolean ignoreReconnect = false;
/**
* Lock to prevent multiple threads on this member from performing
* a clear at the same time.
*/
private final Object clearLock = new Object();
private static AtomicBoolean loggedNetworkPartitionWarning = new AtomicBoolean(false);
/** Creates a new instance of DistributedRegion */
protected DistributedRegion(String regionName, RegionAttributes attrs,
LocalRegion parentRegion, GemFireCacheImpl cache,
InternalRegionArguments internalRegionArgs) {
super(regionName, attrs, parentRegion, cache, internalRegionArgs);
this.initializationLatchAfterMemberTimeout = new StoppableCountDownLatch(
getCancelCriterion(), 1);
this.distAdvisor = createDistributionAdvisor(internalRegionArgs);
if (getDistributionManager().getConfig().getEnableNetworkPartitionDetection()
&& !isInternalRegion() && !attrs.getScope().isAck() && !doesNotDistribute() && attrs.getDataPolicy().withStorage()) {
logger.warn(LocalizedMessage.create(LocalizedStrings.DistributedRegion_REGION_0_1_SPLITBRAIN_CONFIG_WARNING,
new Object[] { regionName, attrs.getScope() }));
}
if (!getDistributionManager().getConfig().getEnableNetworkPartitionDetection()
&& attrs.getDataPolicy().withPersistence() && !loggedNetworkPartitionWarning.getAndSet(true)) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.DistributedRegion_REGION_0_ENABLE_NETWORK_PARTITION_WARNING,
new Object[] { regionName, attrs.getScope() }));
}
boolean setRequiresReliabilityCheck = attrs.getMembershipAttributes()
.hasRequiredRoles()
&&
// note that the following includes NO_ACCESS, LIMITED_ACCESS,
!attrs.getMembershipAttributes().getLossAction().isAllAccess()
&& !attrs.getMembershipAttributes().getLossAction().isReconnect();
// this optimization is safe for as long as Roles and Required Roles are
// immutable
// if this VM fulfills all required roles, make requiresReliabilityCheck
// false
Set reqRoles = new HashSet(attrs.getMembershipAttributes()
.getRequiredRoles());
reqRoles.removeAll(getSystem().getDistributedMember().getRoles());
if (reqRoles.isEmpty()) {
setRequiresReliabilityCheck = false;
}
this.requiresReliabilityCheck = setRequiresReliabilityCheck;
{
ReliableMessageQueue tmp = null;
if (this.requiresReliabilityCheck) {
// if
// (attrs.getMembershipAttributes().getLossAction().isAllAccessWithQueuing())
// {
// tmp = cache.getReliableMessageQueueFactory().create(this);
// }
}
this.rmq = tmp;
}
if(internalRegionArgs.isUsedForPartitionedRegionBucket()) {
this.persistenceAdvisor = internalRegionArgs.getPersistenceAdvisor();
} else if (this.allowsPersistence()){
//TODO prpersist - using this lock service is a hack. Maybe? Or maybe
//it's ok if we have one (rarely used) lock service for many operations?
//What does the resource manager do?
DistributedLockService dl = cache.getPartitionedRegionLockService();
try {
//TODO prpersist - this is just a quick and dirty storage mechanism so that
//I can test the storage.
DiskRegionStats diskStats;
PersistentMemberView storage;
if(getDataPolicy().withPersistence()) {
storage = getDiskRegion();
diskStats = getDiskRegion().getStats();
} else {
storage = new InMemoryPersistentMemberView();
diskStats = null;
}
PersistentMemberManager memberManager = cache.getPersistentMemberManager();
this.persistenceAdvisor = new PersistenceAdvisorImpl(distAdvisor, dl, storage, this.getFullPath(), diskStats, memberManager);
} catch (Exception e) {
throw new InternalGemFireError("Couldn't recover persistence");
}
} else {
this.persistenceAdvisor = null;
}
if(this.persistenceAdvisor != null) {
this.persistentId = persistenceAdvisor.generatePersistentID();
} else {
this.persistentId = null;
}
}
@Override
public void createEventTracker() {
this.eventTracker = new EventTracker(this);
this.eventTracker.start();
}
/**
* Intended for used during construction of a DistributedRegion
*
* @return the advisor to be used by the region
*/
protected CacheDistributionAdvisor createDistributionAdvisor(InternalRegionArguments internalRegionArgs) {
return CacheDistributionAdvisor.createCacheDistributionAdvisor(this); // Warning: potential early escape of object before full construction
}
/**
* Does this region support persistence?
*/
public boolean allowsPersistence() {
return true;
}
@Override
public boolean requiresOneHopForMissingEntry(EntryEventImpl event) {
// received from another member - don't use one-hop
if (event.isOriginRemote()) {
return false;
}
// local ops aren't distributed
if (event.getOperation().isLocal()) {
return false;
}
// if it already has a valid version tag it can go out with a DistributedCacheOperation
if (event.getVersionTag() != null && event.getVersionTag().getRegionVersion() > 0) {
return false;
}
// if we're not allowed to generate a version tag we need to send it to someone who can
if (!this.generateVersionTag) {
return true;
}
return this.concurrencyChecksEnabled &&
(this.srp == null) &&
!isTX() &&
this.scope.isDistributed() &&
!this.dataPolicy.withReplication();
}
/**
* @see LocalRegion#virtualPut(EntryEventImpl, boolean, boolean, Object,
* boolean, long, boolean)
*/
@Override
protected
boolean virtualPut(EntryEventImpl event,
boolean ifNew,
boolean ifOld,
Object expectedOldValue,
boolean requireOldValue,
long lastModified,
boolean overwriteDestroyed)
throws TimeoutException,
CacheWriterException {
final boolean isTraceEnabled = logger.isTraceEnabled();
Lock dlock = null;
if (this.scope.isGlobal() && // lock only applies to global scope
!event.isOriginRemote() && // only if operation originating locally
!event.isNetSearch() && // search and load processor handles own locking
!event.isNetLoad() &&
// @todo darrel/kirk: what about putAll?
!event.isLocalLoad() &&
!event.isSingleHopPutOp()) { // Single Hop Op means dlock is already taken at origin node.
dlock = this.getDistributedLockIfGlobal(event.getKey());
}
if (isTraceEnabled) {
logger.trace("virtualPut invoked for event {}", event);
}
try {
if (!hasSeenEvent(event)) {
if (this.requiresOneHopForMissingEntry(event)) {
// bug #45704: see if a one-hop must be done for this operation
RegionEntry re = getRegionEntry(event.getKey());
if (re == null /*|| re.isTombstone()*/ || !this.generateVersionTag) {
if (!event.isBulkOpInProgress() || this.dataPolicy.withStorage()) {
// putAll will send a single one-hop for empty regions. for other missing entries
// we need to get a valid version number before modifying the local cache
boolean didDistribute = RemotePutMessage.distribute(event, lastModified,
false, false, expectedOldValue, requireOldValue, !this.generateVersionTag);
if (!didDistribute && isTraceEnabled) {
logger.trace("Unable to perform one-hop messaging");
}
if (!this.generateVersionTag && !didDistribute) {
throw new PersistentReplicatesOfflineException();
}
if (didDistribute) {
if (isTraceEnabled) {
logger.trace("Event after remotePut operation: {}", event);
}
if (event.getVersionTag() == null) {
// if the event wasn't applied by the one-hop replicate it will not have a version tag
// and so should not be applied to this cache
return false;
}
}
}
}
}
return super.virtualPut(event,
ifNew,
ifOld,
expectedOldValue,
requireOldValue,
lastModified,
overwriteDestroyed);
}
else {
if (event.getDeltaBytes() != null && event.getRawNewValue() == null) {
// This means that this event has delta bytes but no full value.
// Request the full value of this event.
// The value in this vm may not be same as this event's value.
throw new InvalidDeltaException(
"Cache encountered replay of event containing delta bytes for key "
+ event.getKey());
}
// if the listeners have already seen this event, then it has already
// been successfully applied to the cache. Distributed messages and
// return
if (isTraceEnabled) {
logger.trace("DR.virtualPut: this cache has already seen this event {}", event);
}
// Gester, Fix 39014: when hasSeenEvent, put will still distribute
// event, but putAll did not. We add the logic back here, not to put
// back into DR.distributeUpdate() because we moved this part up into
// LR.basicPutPart3 in purpose. Reviewed by Bruce.
if (event.isBulkOpInProgress() && !event.isOriginRemote()) {
event.getPutAllOperation().addEntry(event, true);
}
/* doing this so that other VMs will apply this no matter what. If it
* is an "update" they will not apply it if they don't have the key.
* Because this is probably a retry, it will never get applied to this
* local AbstractRegionMap, and so will never be flipped to a 'create'
*/
event.makeCreate();
distributeUpdate(event, lastModified, ifNew, ifOld, expectedOldValue, requireOldValue);
event.invokeCallbacks(this,true, true);
return true;
}
}
finally {
if (dlock != null) {
dlock.unlock();
}
}
}
@Override
protected RegionEntry basicPutEntry(EntryEventImpl event, long lastModified)
throws TimeoutException, CacheWriterException {
final boolean isTraceEnabled = logger.isTraceEnabled();
if (isTraceEnabled) {
logger.trace("basicPutEntry invoked for event {}", event);
}
if (this.requiresOneHopForMissingEntry(event)) {
// bug #45704: see if a one-hop must be done for this operation
RegionEntry re = getRegionEntry(event.getKey());
if (re == null /*|| re.isTombstone()*/ || !this.generateVersionTag) {
final boolean ifNew = false;
final boolean ifOld = false;
boolean didDistribute = RemotePutMessage.distribute(event, lastModified,
ifNew, ifOld, null, false, !this.generateVersionTag);
if (!this.generateVersionTag && !didDistribute) {
throw new PersistentReplicatesOfflineException();
}
if (didDistribute && isTraceEnabled) {
logger.trace("Event after remotePut for basicPutEntry: {}", event);
}
}
}
return super.basicPutEntry(event, lastModified);
}
@Override
public void performPutAllEntry(EntryEventImpl event) {
/*
* force shared data view so that we just do the virtual op, accruing things in the put all operation for later
*/
if(isTX()) {
event.getPutAllOperation().addEntry(event);
} else {
getSharedDataView().putEntry(event, false, false, null, false, 0L, false);
}
}
@Override
public void performRemoveAllEntry(EntryEventImpl event) {
// force shared data view so that we just do the virtual op, accruing things in the bulk operation for later
if(isTX()) {
event.getRemoveAllOperation().addEntry(event);
} else {
basicDestroy(event, true, null);
//getSharedDataView().destroyExistingEntry(event, true, null);
}
}
/**
* distribution and listener notification
*/
@Override
public void basicPutPart3(EntryEventImpl event, RegionEntry entry,
boolean isInitialized, long lastModified, boolean invokeCallbacks,
boolean ifNew, boolean ifOld, Object expectedOldValue,
boolean requireOldValue) {
distributeUpdate(event, lastModified, false, false, null, false);
super.basicPutPart3(event, entry, isInitialized, lastModified,
invokeCallbacks, ifNew, ifOld, expectedOldValue, requireOldValue);
}
/** distribute an update operation */
protected void distributeUpdate(EntryEventImpl event, long lastModified, boolean ifNew, boolean ifOld, Object expectedOldValue, boolean requireOldValue) {
// an update from a netSearch is not distributed
if (!event.isOriginRemote() && !event.isNetSearch() && !event.isBulkOpInProgress()) {
boolean distribute = true;
if (event.getInhibitDistribution()) {
// this has already been distributed by a one-hop operation
distribute = false;
}
if (distribute) {
UpdateOperation op = new UpdateOperation(event, lastModified);
if (logger.isTraceEnabled()) {
logger.trace("distributing operation for event : {} : for region : {}", event, this.getName());
}
op.distribute();
}
}
}
protected void setGeneratedVersionTag(boolean generateVersionTag) {
// there is at-least one other persistent member, so turn on concurrencyChecks
enableConcurrencyChecks();
this.generateVersionTag = generateVersionTag;
}
protected boolean getGenerateVersionTag() {
return this.generateVersionTag;
}
@Override
protected boolean shouldGenerateVersionTag(RegionEntry entry, EntryEventImpl event) {
if (logger.isTraceEnabled()) {
logger.trace("shouldGenerateVersionTag this.generateVersionTag={} ccenabled={} dataPolicy={} event:{}",
this.generateVersionTag, this.concurrencyChecksEnabled, this.dataPolicy, event);
}
if (!this.concurrencyChecksEnabled || this.dataPolicy == DataPolicy.EMPTY || !this.generateVersionTag) {
return false;
}
if (this.srp != null) { // client
return false;
}
if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) {
return false;
}
if (event.getOperation().isLocal()) { // bug #45402 - localDestroy generated a version tag
return false;
}
if (!event.isOriginRemote() && this.dataPolicy.withReplication()) {
return true;
}
if (!this.dataPolicy.withReplication() && !this.dataPolicy.withPersistence()) {
if (!entry.getVersionStamp().hasValidVersion()) {
// do not generate a version stamp in a region that has no replication if it's not based
// on an existing version from a replicate region
return false;
}
return true;
}
if (!event.isOriginRemote() && event.getDistributedMember() != null) {
if (!event.getDistributedMember().equals(this.getMyId())) {
return event.getVersionTag() == null; // one-hop remote message
}
}
return false;
}
/**
* Throws RegionAccessException if required roles are missing and the
* LossAction is NO_ACCESS
*
* @throws RegionAccessException
* if required roles are missing and the LossAction is NO_ACCESS
*/
@Override
protected void checkForNoAccess()
{
if (this.requiresReliabilityCheck && this.isMissingRequiredRoles) {
if (getMembershipAttributes().getLossAction().isNoAccess()) {
synchronized (this.missingRequiredRoles) {
if (!this.isMissingRequiredRoles)
return;
Set roles = Collections.unmodifiableSet(new HashSet(
this.missingRequiredRoles));
throw new RegionAccessException(LocalizedStrings.DistributedRegion_OPERATION_IS_DISALLOWED_BY_LOSSACTION_0_BECAUSE_THESE_REQUIRED_ROLES_ARE_MISSING_1.toLocalizedString(new Object[] {getMembershipAttributes().getLossAction(), roles}), getFullPath(), roles);
}
}
}
}
/**
* Throws RegionAccessException is required roles are missing and the
* LossAction is either NO_ACCESS or LIMITED_ACCESS.
*
* @throws RegionAccessException
* if required roles are missing and the LossAction is either
* NO_ACCESS or LIMITED_ACCESS
*/
@Override
protected void checkForLimitedOrNoAccess()
{
if (this.requiresReliabilityCheck && this.isMissingRequiredRoles) {
if (getMembershipAttributes().getLossAction().isNoAccess()
|| getMembershipAttributes().getLossAction().isLimitedAccess()) {
synchronized (this.missingRequiredRoles) {
if (!this.isMissingRequiredRoles)
return;
Set roles = Collections.unmodifiableSet(new HashSet(
this.missingRequiredRoles));
Assert.assertTrue(!roles.isEmpty());
throw new RegionAccessException(LocalizedStrings.DistributedRegion_OPERATION_IS_DISALLOWED_BY_LOSSACTION_0_BECAUSE_THESE_REQUIRED_ROLES_ARE_MISSING_1
.toLocalizedString(new Object[] { getMembershipAttributes().getLossAction(), roles}), getFullPath(), roles);
}
}
}
}
@Override
protected void handleReliableDistribution(ReliableDistributionData data,
Set successfulRecipients) {
handleReliableDistribution(data, successfulRecipients,
Collections.EMPTY_SET, Collections.EMPTY_SET);
}
protected void handleReliableDistribution(ReliableDistributionData data,
Set successfulRecipients, Set otherRecipients1, Set otherRecipients2)
{
if (this.requiresReliabilityCheck) {
MembershipAttributes ra = getMembershipAttributes();
Set recipients = successfulRecipients;
// determine the successful roles
Set roles = new HashSet();
for (Iterator iter = recipients.iterator(); iter.hasNext();) {
InternalDistributedMember mbr = (InternalDistributedMember)iter.next();
if (mbr != null) {
roles.addAll(mbr.getRoles());
}
}
for (Iterator iter = otherRecipients1.iterator(); iter.hasNext();) {
InternalDistributedMember mbr = (InternalDistributedMember)iter.next();
if (mbr != null) {
roles.addAll(mbr.getRoles());
}
}
for (Iterator iter = otherRecipients2.iterator(); iter.hasNext();) {
InternalDistributedMember mbr = (InternalDistributedMember)iter.next();
if (mbr != null) {
roles.addAll(mbr.getRoles());
}
}
// determine the missing roles
Set failedRoles = new HashSet(ra.getRequiredRoles());
failedRoles.removeAll(roles);
if (failedRoles.isEmpty())
return;
// if (rp.isAllAccessWithQueuing()) {
// this.rmq.add(data, failedRoles);
// } else {
throw new RegionDistributionException(LocalizedStrings.DistributedRegion_OPERATION_DISTRIBUTION_MAY_HAVE_FAILED_TO_NOTIFY_THESE_REQUIRED_ROLES_0.toLocalizedString(failedRoles), getFullPath(), failedRoles);
// }
}
}
/**
*
* Called when we do a distributed operation and don't have anyone to
* distributed it too. Since this is only called when no distribution was done
* (i.e. no recipients) we do not check isMissingRequiredRoles because it
* might not longer be true due to race conditions
*
* @return false if this region has at least one required role and queuing is
* configured. Returns true if sending to no one is ok.
* @throws RoleException
* if a required role is missing and the LossAction is either
* NO_ACCESS or LIMITED_ACCESS.
* @since 5.0
*/
protected boolean isNoDistributionOk()
{
if (this.requiresReliabilityCheck) {
MembershipAttributes ra = getMembershipAttributes();
// if (ra.getLossAction().isAllAccessWithQueuing()) {
// return !ra.hasRequiredRoles();
// } else {
Set failedRoles = ra.getRequiredRoles();
throw new RegionDistributionException(LocalizedStrings.DistributedRegion_OPERATION_DISTRIBUTION_WAS_NOT_DONE_TO_THESE_REQUIRED_ROLES_0.toLocalizedString(failedRoles), getFullPath(), failedRoles);
// }
}
return true;
}
/**
* returns true if this Region does not distribute its operations to other
* members.
* @since 6.0
* @see HARegion#localDestroyNoCallbacks(Object)
*/
public boolean doesNotDistribute() {
return false;
}
@Override
public boolean shouldSyncForCrashedMember(InternalDistributedMember id) {
return !doesNotDistribute() && super.shouldSyncForCrashedMember(id);
}
/**
* Adjust the specified set of recipients by removing any of them that are
* currently having their data queued.
*
* @param recipients
* the set of recipients that a message is to be distributed too.
* Recipients that are currently having their data queued will be
* removed from this set.
* @return the set, possibly null, of recipients that are currently having
* their data queued.
* @since 5.0
*/
protected Set adjustForQueuing(Set recipients)
{
Set result = null;
// if (this.requiresReliabilityCheck) {
// MembershipAttributes ra = getMembershipAttributes();
// if (ra.getLossAction().isAllAccessWithQueuing()) {
// Set currentQueuedRoles = this.rmq.getQueuingRoles();
// if (currentQueuedRoles != null) {
// // foreach recipient see if any of his roles are queued and if
// // they are remove him from recipients and add him to result
// Iterator it = recipients.iterator();
// while (it.hasNext()) {
// DistributedMember dm = (DistributedMember)it.next();
// Set dmRoles = dm.getRoles();
// if (!dmRoles.isEmpty()) {
// if (intersects(dmRoles, currentQueuedRoles)) {
// it.remove(); // fix for bug 34447
// if (result == null) {
// result = new HashSet();
// }
// result.add(dm);
// }
// }
// }
// }
// }
// }
return result;
}
/**
* Returns true if the two sets intersect
*
* @param a
* a non-null non-empty set
* @param b
* a non-null non-empty set
* @return true if sets a and b intersect; false if not
* @since 5.0
*/
public static boolean intersects(Set a, Set b)
{
Iterator it;
Set target;
if (a.size() <= b.size()) {
it = a.iterator();
target = b;
}
else {
it = b.iterator();
target = a;
}
while (it.hasNext()) {
if (target.contains(it.next()))
return true;
}
return false;
}
@Override
public boolean requiresReliabilityCheck()
{
return this.requiresReliabilityCheck;
}
/**
* Returns true if the ExpiryTask is currently allowed to expire.
* <p>
* If the region is in NO_ACCESS due to reliability configuration, then no
* expiration actions are allowed.
* <p>
* If the region is in LIMITED_ACCESS due to reliability configuration, then
* only non-distributed expiration actions are allowed.
*/
@Override
protected boolean isExpirationAllowed(ExpiryTask expiry)
{
if (this.requiresReliabilityCheck && this.isMissingRequiredRoles) {
if (getMembershipAttributes().getLossAction().isNoAccess()) {
return false;
}
if (getMembershipAttributes().getLossAction().isLimitedAccess()
&& expiry.isDistributedAction()) {
return false;
}
}
return true;
}
/**
* Performs the resumption action when reliability is resumed.
*
* @return true if asynchronous resumption is triggered
*/
protected boolean resumeReliability(InternalDistributedMember id,
Set newlyAcquiredRoles)
{
boolean async = false;
try {
ResumptionAction ra = getMembershipAttributes().getResumptionAction();
if (ra.isNone()) {
if (logger.isDebugEnabled()) {
logger.debug("Reliability resumption for action of none");
}
resumeExpiration();
}
else if (ra.isReinitialize()) {
async = true;
asyncResumeReliability(id, newlyAcquiredRoles);
}
}
catch (Exception e) {
logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
}
return async;
}
/**
* Handles asynchronous ResumptionActions such as region reinitialize.
*/
private void asyncResumeReliability(final InternalDistributedMember id,
final Set newlyAcquiredRoles)
throws RejectedExecutionException {
final ResumptionAction ra = getMembershipAttributes().getResumptionAction();
getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
public void run()
{
try {
if (ra.isReinitialize()) {
if (logger.isDebugEnabled()) {
logger.debug("Reliability resumption for action of reinitialize");
}
if (!isDestroyed() && !cache.isClosed()) {
RegionEventImpl event = new RegionEventImpl(
DistributedRegion.this, Operation.REGION_REINITIALIZE, null,
false, getMyId(), generateEventID());
reinitialize(null, event);
}
synchronized (missingRequiredRoles) {
// any number of threads may be waiting on missingRequiredRoles
missingRequiredRoles.notifyAll();
if (hasListener() && id != null) {
// fire afterRoleGain event
RoleEventImpl relEvent = new RoleEventImpl(
DistributedRegion.this, Operation.REGION_CREATE, null,
true, id, newlyAcquiredRoles);
dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_GAIN,
relEvent);
}
}
}
}
catch (Exception e) {
logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
}
}
});
}
/** Reschedules expiry tasks when reliability is resumed. */
private void resumeExpiration()
{
boolean isNoAccess = getMembershipAttributes().getLossAction().isNoAccess();
boolean isLimitedAccess = getMembershipAttributes().getLossAction()
.isLimitedAccess();
if (!(isNoAccess || isLimitedAccess)) {
return; // early out: expiration was never affected by reliability
}
if (getEntryTimeToLive().getTimeout() > 0
&& (isNoAccess || (isLimitedAccess && getEntryTimeToLive().getAction()
.isDistributed()))) {
rescheduleEntryExpiryTasks();
}
else
if (getEntryIdleTimeout().getTimeout() > 0
&& (isNoAccess || (isLimitedAccess && getEntryIdleTimeout().getAction()
.isDistributed()))) {
rescheduleEntryExpiryTasks();
}
else
if (getCustomEntryTimeToLive() != null || getCustomEntryIdleTimeout() != null) {
// Force all entries to be rescheduled
rescheduleEntryExpiryTasks();
}
if (getRegionTimeToLive().getTimeout() > 0
&& (isNoAccess || (isLimitedAccess && getRegionTimeToLive().getAction()
.isDistributed()))) {
addTTLExpiryTask();
}
if (getRegionIdleTimeout().getTimeout() > 0
&& (isNoAccess || (isLimitedAccess && getRegionIdleTimeout()
.getAction().isDistributed()))) {
addIdleExpiryTask();
}
}
/**
* A boolean used to indicate if its the intialization time i.e the
* distributed Region is created for the first time. The variable is used at
* the time of lost reliablility.
*/
private boolean isInitializingThread = false;
/**
* Called when reliability is lost. If MembershipAttributes are configured
* with {@link LossAction#RECONNECT}then DistributedSystem reconnect will be
* called asynchronously.
*
* @return true if asynchronous resumption is triggered
*/
protected boolean lostReliability(final InternalDistributedMember id,
final Set newlyMissingRoles)
{
if (DistributedRegion.ignoreReconnect)
return false;
boolean async = false;
try {
if (getMembershipAttributes().getLossAction().isReconnect()) {
async = true;
if (isInitializingThread) {
doLostReliability(true, id, newlyMissingRoles);
}
else {
doLostReliability(false, id, newlyMissingRoles);
}
// we don't do this in the waiting pool because we're going to
// disconnect
// the distributed system, and it will wait for the pool to empty
/*
* moved to a new method called doLostReliablity. Thread t = new
* Thread("Reconnect Distributed System") { public void run() { try { //
* TODO: may need to check isReconnecting and checkReadiness...
* initializationLatchAfterMemberTimeout.await(); // TODO:
* call reconnect here
* getSystem().tryReconnect((GemFireCache)getCache()); // added for
* reconnect. synchronized (missingRequiredRoles) { // any number of
* threads may be waiting on missingRequiredRoles
* missingRequiredRoles.notifyAll(); // need to fire an event if id is
* not null if (hasListener() && id != null) { RoleEventImpl relEvent =
* new RoleEventImpl( DistributedRegion.this, Operation.CACHE_RECONNECT,
* null, true, id, newlyMissingRoles); dispatchListenerEvent(
* EnumListenerEvent.AFTER_ROLE_LOSS, relEvent); } } } catch (Exception
* e) { } } };
* t.setDaemon(true); t.start();
*/
}
}
catch (CancelException cce) {
throw cce;
}
catch (Exception e) {
logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
}
return async;
}
private void doLostReliability(boolean isInitializing,
final InternalDistributedMember id, final Set newlyMissingRoles)
{
try {
if (!isInitializing) {
// moved code to a new thread.
Thread t = new Thread(LocalizedStrings.DistributedRegion_RECONNECT_DISTRIBUTED_SYSTEM.toLocalizedString()) {
@Override
public void run()
{
try {
// TODO: may need to check isReconnecting and checkReadiness...
if (logger.isDebugEnabled()) {
logger.debug("Reliability loss with policy of reconnect and membership thread doing reconnect");
}
initializationLatchAfterMemberTimeout.await();
getSystem().tryReconnect(false, "Role Loss", getCache());
synchronized (missingRequiredRoles) {
// any number of threads may be waiting on missingRequiredRoles
missingRequiredRoles.notifyAll();
// need to fire an event if id is not null
if (hasListener() && id != null) {
RoleEventImpl relEvent = new RoleEventImpl(
DistributedRegion.this, Operation.CACHE_RECONNECT, null,
true, id, newlyMissingRoles);
dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_LOSS,
relEvent);
}
}
}
catch (Exception e) {
logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
}
}
};
t.setDaemon(true);
t.start();
}
else {
getSystem().tryReconnect(false, "Role Loss", getCache()); // added for
// reconnect.
synchronized (missingRequiredRoles) {
// any number of threads may be waiting on missingRequiredRoles
missingRequiredRoles.notifyAll();
// need to fire an event if id is not null
if (hasListener() && id != null) {
RoleEventImpl relEvent = new RoleEventImpl(DistributedRegion.this,
Operation.CACHE_RECONNECT, null, true, id, newlyMissingRoles);
dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_LOSS, relEvent);
}
}
// } catch (CancelException cce){
// }
}
}
catch (CancelException ignor) {
throw ignor;
}
catch (Exception e) {
logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
}
}
protected void lockCheckReadiness()
{
// fix for bug 32610
cache.getCancelCriterion().checkCancelInProgress(null);
checkReadiness();
}
@Override
public final Object validatedDestroy(Object key, EntryEventImpl event)
throws TimeoutException, EntryNotFoundException, CacheWriterException {
Lock dlock = this.getDistributedLockIfGlobal(key);
try {
return super.validatedDestroy(key, event);
} finally {
if (dlock != null) {
dlock.unlock();
}
}
}
/**
* @see LocalRegion#localDestroyNoCallbacks(Object)
*/
@Override
public void localDestroyNoCallbacks(Object key)
{
super.localDestroyNoCallbacks(key);
if (getScope().isGlobal()) {
try {
this.getLockService().freeResources(key);
}
catch (LockServiceDestroyedException ignore) {
}
}
}
/**
* @see LocalRegion#localDestroy(Object, Object)
*/
@Override
public void localDestroy(Object key, Object aCallbackArgument)
throws EntryNotFoundException
{
super.localDestroy(key, aCallbackArgument);
if (getScope().isGlobal()) {
try {
this.getLockService().freeResources(key);
}
catch (LockServiceDestroyedException ignore) {
}
}
}
/**
* @see LocalRegion#invalidate(Object, Object)
*/
@Override
public void invalidate(Object key, Object aCallbackArgument)
throws TimeoutException, EntryNotFoundException
{
validateKey(key);
validateCallbackArg(aCallbackArgument);
checkReadiness();
checkForLimitedOrNoAccess();
Lock dlock = this.getDistributedLockIfGlobal(key);
try {
super.validatedInvalidate(key, aCallbackArgument);
}
finally {
if (dlock != null)
dlock.unlock();
}
}
@Override
public Lock getRegionDistributedLock() throws IllegalStateException
{
lockCheckReadiness();
checkForLimitedOrNoAccess();
if (!this.scope.isGlobal()) {
throw new IllegalStateException(LocalizedStrings.DistributedRegion_DISTRIBUTION_LOCKS_ARE_ONLY_SUPPORTED_FOR_REGIONS_WITH_GLOBAL_SCOPE_NOT_0.toLocalizedString(this.scope));
}
return new RegionDistributedLock();
}
@Override
public Lock getDistributedLock(Object key) throws IllegalStateException
{
validateKey(key);
lockCheckReadiness();
checkForLimitedOrNoAccess();
if (!this.scope.isGlobal()) {
throw new IllegalStateException(LocalizedStrings.DistributedRegion_DISTRIBUTION_LOCKS_ARE_ONLY_SUPPORTED_FOR_REGIONS_WITH_GLOBAL_SCOPE_NOT_0.toLocalizedString(this.scope));
}
if (isLockingSuspendedByCurrentThread()) {
throw new IllegalStateException(LocalizedStrings.DistributedRegion_THIS_THREAD_HAS_SUSPENDED_ALL_LOCKING_FOR_THIS_REGION.toLocalizedString());
}
return new DistributedLock(key);
}
/**
* Called while NOT holding lock on parent's subregions
*
* @throws IllegalStateException
* if region is not compatible with a region in another VM.
*
* @see LocalRegion#initialize(InputStream, InternalDistributedMember, InternalRegionArguments)
*/
@Override
protected void initialize(InputStream snapshotInputStream,
InternalDistributedMember imageTarget, InternalRegionArguments internalRegionArgs) throws TimeoutException,
IOException, ClassNotFoundException
{
Assert.assertTrue(!isInitialized());
if (logger.isDebugEnabled()) {
logger.debug("DistributedRegion.initialize BEGIN: {}", getFullPath());
}
// if we're versioning entries we need a region-level version vector
if (this.scope.isDistributed() && this.concurrencyChecksEnabled) {
createVersionVector();
}
if (this.scope.isGlobal()) {
getLockService(); // create lock service eagerly now
}
final IndexUpdater indexUpdater = getIndexUpdater();
boolean sqlfGIILockTaken = false;
// this try block is to release the SQLF GII lock in finally
// which should be done after bucket status will be set
// properly in LocalRegion#initialize()
try {
try {
try {
// take the GII lock to avoid missing entries while updating the
// index list for SQLFabric (#41330 and others)
if (indexUpdater != null) {
indexUpdater.lockForGII();
sqlfGIILockTaken = true;
}
PersistentMemberID persistentId = null;
boolean recoverFromDisk = isRecoveryNeeded();
DiskRegion dskRgn = getDiskRegion();
if (recoverFromDisk) {
if (logger.isDebugEnabled()) {
logger.debug("DistributedRegion.getInitialImageAndRecovery: Starting Recovery");
}
dskRgn.initializeOwner(this); // do recovery
if (logger.isDebugEnabled()) {
logger.debug("DistributedRegion.getInitialImageAndRecovery: Finished Recovery");
}
persistentId = dskRgn.getMyPersistentID();
}
// Create OQL indexes before starting GII.
createOQLIndexes(internalRegionArgs, recoverFromDisk);
if (getDataPolicy().withReplication()
|| getDataPolicy().withPreloaded()) {
getInitialImageAndRecovery(snapshotInputStream, imageTarget,
internalRegionArgs, recoverFromDisk, persistentId);
}
else {
new CreateRegionProcessor(this).initializeRegion();
if (snapshotInputStream != null) {
releaseBeforeGetInitialImageLatch();
loadSnapshotDuringInitialization(snapshotInputStream);
}
}
}
catch (DiskAccessException dae) {
this.handleDiskAccessException(dae, true);
throw dae;
}
initMembershipRoles();
isInitializingThread = false;
super.initialize(null, null, null); // makes sure all latches are released if they haven't been already
} finally {
if (this.eventTracker != null) {
this.eventTracker.setInitialized();
}
}
} finally {
if (sqlfGIILockTaken) {
indexUpdater.unlockForGII();
}
}
}
@Override
public void initialized() {
new UpdateAttributesProcessor(this).distribute(false);
}
/** True if GII was impacted by missing required roles */
private boolean giiMissingRequiredRoles = false;
/**
* A reference counter to protected the memoryThresholdReached boolean
*/
private final Set<DistributedMember> memoryThresholdReachedMembers =
new CopyOnWriteArraySet<DistributedMember>();
private ConcurrentParallelGatewaySenderQueue hdfsQueue;
/** Sets and returns giiMissingRequiredRoles */
private boolean checkInitialImageForReliability(
InternalDistributedMember imageTarget,
CacheDistributionAdvisor.InitialImageAdvice advice)
{
// assumption: required roles are interesting to GII only if Reinitialize...
// if (true)
return false;
// if (getMembershipAttributes().hasRequiredRoles()
// && getMembershipAttributes().getResumptionAction().isReinitialize()) {
// // are any required roles missing for GII with Reinitialize?
// Set missingRR = new HashSet(getMembershipAttributes().getRequiredRoles());
// missingRR.removeAll(getSystem().getDistributedMember().getRoles());
// for (Iterator iter = advice.replicates.iterator(); iter.hasNext();) {
// DistributedMember member = (DistributedMember)iter.next();
// missingRR.removeAll(member.getRoles());
// }
// for (Iterator iter = advice.others.iterator(); iter.hasNext();) {
// DistributedMember member = (DistributedMember)iter.next();
// missingRR.removeAll(member.getRoles());
// }
// for (Iterator iter = advice.preloaded.iterator(); iter.hasNext();) {
// DistributedMember member = (DistributedMember)iter.next();
// missingRR.removeAll(member.getRoles());
// }
// if (!missingRR.isEmpty()) {
// // entering immediate loss condition, which will cause reinit on resume
// this.giiMissingRequiredRoles = true;
// }
// }
// return this.giiMissingRequiredRoles;
}
private void getInitialImageAndRecovery(InputStream snapshotInputStream,
InternalDistributedMember imageSrc, InternalRegionArguments internalRegionArgs,
boolean recoverFromDisk, PersistentMemberID persistentId) throws TimeoutException
{
logger.info(LocalizedMessage.create(LocalizedStrings.DistributedRegion_INITIALIZING_REGION_0, this.getName()));
ImageState imgState = getImageState();
imgState.init();
boolean targetRecreated = internalRegionArgs.getRecreateFlag();
Boolean isCBool = (Boolean)isConversion.get();
boolean isForConversion = isCBool!=null?isCBool.booleanValue():false;
if (recoverFromDisk && snapshotInputStream != null && !isForConversion) {
throw new InternalGemFireError(LocalizedStrings.DistributedRegion_IF_LOADING_A_SNAPSHOT_THEN_SHOULD_NOT_BE_RECOVERING_ISRECOVERING_0_SNAPSHOTSTREAM_1.toLocalizedString(new Object[] {Boolean.valueOf(recoverFromDisk), snapshotInputStream}));
}
ProfileExchangeProcessor targetProvider;
if (dataPolicy.withPersistence()) {
targetProvider = new CreatePersistentRegionProcessor(this,
getPersistenceAdvisor(), recoverFromDisk);
}
else {
// this will go in the advisor profile
targetProvider = new CreateRegionProcessor(this);
}
imgState.setInRecovery(false);
RegionVersionVector recovered_rvv = null;
if (dataPolicy.withPersistence()) {
recovered_rvv = (this.getVersionVector()==null?null:this.getVersionVector().getCloneForTransmission());
}
// initializeRegion will send out our profile
targetProvider.initializeRegion();
if(persistenceAdvisor != null) {
persistenceAdvisor.initialize();
}
// Register listener here so that the remote members are known
// since registering calls initializeCriticalMembers (which needs to know about
// remote members
if (!isInternalRegion()) {
if (!this.isDestroyed) {
cache.getResourceManager().addResourceListener(ResourceType.MEMORY, this);
}
}
releaseBeforeGetInitialImageLatch();
// allow GII to invoke test hooks. Do this just after releasing the
// before-gii latch for bug #48962. See ConcurrentLeaveDuringGIIDUnitTest
InitialImageOperation.beforeGetInitialImage(this);
if (snapshotInputStream != null) {
try {
if (logger.isDebugEnabled()) {
logger.debug("DistributedRegion.getInitialImageAndRecovery: About to load snapshot, isInitialized={}; {}",
isInitialized(), getFullPath());
}
loadSnapshotDuringInitialization(snapshotInputStream);
}
catch (IOException e) {
throw new RuntimeException(e); // @todo change this exception?
}
catch (ClassNotFoundException e) {
throw new RuntimeException(e); // @todo change this exception?
}
cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus.NO_GII);
return;
}
// No snapshot provided, use the imageTarget(s)
// if we were given a recommended imageTarget, use that first, and
// treat it like it is a replicate (regardless of whether it actually is
// or not)
InitialImageOperation iiop = new InitialImageOperation(this, this.entries);
// [defunct] Special case GII for PR admin regions (which are always
// replicates and always writers
// bruce: this was commented out after adding the GIIAckRequest logic to
// force
// consistency before the gii operation begins
// if (isUsedForPartitionedRegionAdmin() ||
// isUsedForPartitionedRegionBucket()) {
// releaseBeforeGetInitialImageLatch();
// iiop.getFromAll(this.distAdvisor.adviseGeneric(), false);
// cleanUpDestroyedTokens();
// return;
// }
CacheDistributionAdvisor.InitialImageAdvice advice = null;
boolean done = false;
while(!done && !isDestroyed()) {
advice = targetProvider.getInitialImageAdvice(advice);
checkInitialImageForReliability(imageSrc, advice);
boolean attemptGetFromOne =
imageSrc != null // we were given a specific member
|| this.dataPolicy.withPreloaded()
&& !advice.preloaded.isEmpty() // this is a preloaded region
|| (!advice.replicates.isEmpty());
// That is: if we have 0 or 1 giiProvider then we can do a getFromOne gii;
// if we have 2 or more giiProviders then we must do a getFromAll gii.
if (attemptGetFromOne) {
if (recoverFromDisk) {
if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER){
CacheObserverHolder.getInstance().afterMarkingGIIStarted();
}
}
{
// If we have an imageSrc and the target is reinitializing mark the
// getInitialImage so that it will wait until the target region is fully initialized
// before responding to the get image request. Otherwise, the
// source may respond with no data because it is still initializing,
// e.g. loading a snapshot.
// Plan A: use specified imageSrc, if specified
if (imageSrc != null) {
try {
GIIStatus ret = iiop.getFromOne(Collections.singleton(imageSrc),
targetRecreated, advice, recoverFromDisk, recovered_rvv);
if (GIIStatus.didGII(ret)) {
this.giiMissingRequiredRoles = false;
cleanUpDestroyedTokensAndMarkGIIComplete(ret);
done = true;
return;
}
} finally {
imageSrc = null;
}
}
// Plan C: use a replicate, if one exists
GIIStatus ret = iiop.getFromOne(advice.replicates, false, advice, recoverFromDisk, recovered_rvv);
if (GIIStatus.didGII(ret)) {
cleanUpDestroyedTokensAndMarkGIIComplete(ret);
done = true;
return;
}
// Plan D: if this is a PRELOADED region, fetch from another PRELOADED
if (this.dataPolicy.isPreloaded()) {
GIIStatus ret_preload = iiop.getFromOne(advice.preloaded, false, advice, recoverFromDisk, recovered_rvv);
if (GIIStatus.didGII(ret_preload)) {
cleanUpDestroyedTokensAndMarkGIIComplete(ret_preload);
done = true;
return;
}
} // isPreloaded
}
//If we got to this point, we failed in the GII. Cleanup
//any partial image we received
cleanUpAfterFailedGII(recoverFromDisk);
} // attemptGetFromOne
else {
if(!isDestroyed()) {
if(recoverFromDisk) {
logger.info(LocalizedMessage.create(LocalizedStrings.DistributedRegion_INITIALIZED_FROM_DISK,
new Object[] {this.getFullPath(), persistentId, getPersistentID()}));
if(persistentId != null) {
RegionLogger.logRecovery(this.getFullPath(), persistentId,
getDistributionManager().getDistributionManagerId());
}
} else {
RegionLogger.logCreate(this.getFullPath(),
getDistributionManager().getDistributionManagerId());
if (getPersistentID() != null) {
RegionLogger.logPersistence(this.getFullPath(),
getDistributionManager().getDistributionManagerId(),
getPersistentID());
logger.info(LocalizedMessage.create(LocalizedStrings.DistributedRegion_NEW_PERSISTENT_REGION_CREATED,
new Object[] {this.getFullPath(), getPersistentID()}));
}
}
/* no more union GII
// do union getInitialImage
Set rest = new HashSet();
rest.addAll(advice.others);
rest.addAll(advice.preloaded);
// push profile w/ recovery flag turned off at same time that we
// do a union getInitialImage
boolean pushProfile = recoverFromDisk;
iiop.getFromAll(rest, pushProfile);
*/
cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus.NO_GII);
done = true;
return;
}
break;
}
}
return;
}
private void synchronizeWith(InternalDistributedMember target,
VersionSource idToRecover) {
InitialImageOperation op = new InitialImageOperation(this, this.entries);
op.synchronizeWith(target, idToRecover, null);
}
/**
* If this region has concurrency controls enabled this will pull any missing
* changes from other replicates using InitialImageOperation and a filtered
* chunking protocol.
*/
public void synchronizeForLostMember(InternalDistributedMember
lostMember, VersionSource lostVersionID) {
if (this.concurrencyChecksEnabled == false) {
return;
}
CacheDistributionAdvisor advisor = getCacheDistributionAdvisor();
Set<InternalDistributedMember> targets = advisor.adviseInitializedReplicates();
for (InternalDistributedMember target: targets) {
synchronizeWith(target, lostVersionID, lostMember);
}
}
/**
* synchronize with another member wrt messages from the given "lost" member.
* This can be used when a primary bucket crashes to ensure that interrupted
* message distribution is mended.
*/
private void synchronizeWith(InternalDistributedMember target,
VersionSource versionMember, InternalDistributedMember lostMember) {
InitialImageOperation op = new InitialImageOperation(this, this.entries);
op.synchronizeWith(target, versionMember, lostMember);
}
/**
* invoked just before an initial image is requested from another member
*/
/** remove any partial entries received in a failed GII */
protected void cleanUpAfterFailedGII(boolean recoverFromDisk) {
DiskRegion dskRgn = getDiskRegion();
//if we have a persistent region, instead of deleting everything on disk,
//we will just reset the "recovered from disk" flag. After
//the next GII we will delete these entries if they do not come
//in as part of the GII.
if (recoverFromDisk && dskRgn != null && dskRgn.isBackup()) {
dskRgn.resetRecoveredEntries(this);
return;
}
if (!this.entries.isEmpty()) {
closeEntries();
if (getDiskRegion() != null) {
getDiskRegion().clear(this, null);
}
// clear the left-members and version-tags sets in imageState
getImageState().getLeftMembers();
getImageState().getVersionTags();
// Clear OQL indexes
if (this.indexManager != null) {
try {
this.indexManager.rerunIndexCreationQuery();
} catch (Exception ex){
if (logger.isDebugEnabled()) {
logger.debug("Exception while clearing indexes after GII failure.", ex);
}
}
}
}
}
private void initMembershipRoles()
{
synchronized (this.advisorListener) {
// hold sync to prevent listener from changing initial members
Set others = this.distAdvisor
.addMembershipListenerAndAdviseGeneric(this.advisorListener);
this.advisorListener.addMembers(others);
// initialize missing required roles with initial member info
if (getMembershipAttributes().hasRequiredRoles()) {
// AdvisorListener will also sync on missingRequiredRoles
synchronized (this.missingRequiredRoles) {
this.missingRequiredRoles.addAll(getMembershipAttributes()
.getRequiredRoles());
// remove all the roles we are playing since they will never be
// missing
this.missingRequiredRoles.removeAll(getSystem()
.getDistributedMember().getRoles());
for (Iterator iter = others.iterator(); iter.hasNext();) {
DistributedMember other = (DistributedMember)iter.next();
this.missingRequiredRoles.removeAll(other.getRoles());
}
}
}
}
if (getMembershipAttributes().hasRequiredRoles()) {
// wait up to memberTimeout for required roles...
// boolean requiredRolesAreMissing = false;
int memberTimeout = getSystem().getConfig().getMemberTimeout();
if (logger.isDebugEnabled()) {
logger.debug("Waiting up to {} for required roles.", memberTimeout);
}
try {
if (this.giiMissingRequiredRoles) {
// force reliability loss and possibly resumption
isInitializingThread = true;
synchronized (this.advisorListener) {
synchronized (this.missingRequiredRoles) {
// forcing state of loss because of bad GII
this.isMissingRequiredRoles = true;
getCachePerfStats().incReliableRegionsMissing(1);
if (getMembershipAttributes().getLossAction().isAllAccess())
getCachePerfStats().incReliableRegionsMissingFullAccess(1); // rahul
else if (getMembershipAttributes().getLossAction()
.isLimitedAccess())
getCachePerfStats().incReliableRegionsMissingLimitedAccess(1);
else if (getMembershipAttributes().getLossAction().isNoAccess())
getCachePerfStats().incReliableRegionsMissingNoAccess(1);
// pur code to increment the stats.
if (logger.isDebugEnabled()) {
logger.debug("GetInitialImage had missing required roles.");
}
// TODO: will this work with RECONNECT and REINITIALIZE?
isInitializingThread = true;
lostReliability(null, null);
if (this.missingRequiredRoles.isEmpty()) {
// all required roles are present so force resumption
this.isMissingRequiredRoles = false;
getCachePerfStats().incReliableRegionsMissing(-1);
if (getMembershipAttributes().getLossAction().isAllAccess())
getCachePerfStats().incReliableRegionsMissingFullAccess(-1); // rahul
else if (getMembershipAttributes().getLossAction()
.isLimitedAccess())
getCachePerfStats()
.incReliableRegionsMissingLimitedAccess(-1);
else if (getMembershipAttributes().getLossAction().isNoAccess())
getCachePerfStats().incReliableRegionsMissingNoAccess(-1);
// pur code to increment the stats.
boolean async = resumeReliability(null, null);
if (async) {
advisorListener.destroyed = true;
}
}
}
}
}
else {
if (!getSystem().isLoner()) {
waitForRequiredRoles(memberTimeout);
}
synchronized (this.advisorListener) {
synchronized (this.missingRequiredRoles) {
if (this.missingRequiredRoles.isEmpty()) {
Assert.assertTrue(!this.isMissingRequiredRoles);
if (logger.isDebugEnabled()) {
logger.debug("Initialization completed with all required roles present.");
}
}
else {
// starting in state of loss...
this.isMissingRequiredRoles = true;
getCachePerfStats().incReliableRegionsMissing(1);
if (getMembershipAttributes().getLossAction().isAllAccess())
getCachePerfStats().incReliableRegionsMissingFullAccess(1); // rahul
else if (getMembershipAttributes().getLossAction()
.isLimitedAccess())
getCachePerfStats().incReliableRegionsMissingLimitedAccess(1);
else if (getMembershipAttributes().getLossAction().isNoAccess())
getCachePerfStats().incReliableRegionsMissingNoAccess(1);
if (logger.isDebugEnabled()) {
logger.debug("Initialization completed with missing required roles: {}", this.missingRequiredRoles);
}
isInitializingThread = true;
lostReliability(null, null);
}
}
}
}
}
catch (RegionDestroyedException ignore) {
// ignore to fix bug 34639 may be thrown by waitForRequiredRoles
}
catch (CancelException ignore) {
// ignore to fix bug 34639 may be thrown by waitForRequiredRoles
if (isInitializingThread) {
throw ignore;
}
}
catch (Exception e) {
logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
}
}
// open latch which will allow any threads in lostReliability to proceed
this.initializationLatchAfterMemberTimeout.countDown();
}
private boolean isRecoveryNeeded() {
return getDataPolicy().withPersistence()
&& getDiskRegion().isRecreated();
}
// called by InitialImageOperation to clean up destroyed tokens
// release afterGetInitialImageInitializationLatch before unlocking
// cleanUpLock
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK")
private void cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus giiStatus)
{
//We need to clean up the disk before we release the after get initial image latch
DiskRegion dskRgn = getDiskRegion();
if (dskRgn != null && dskRgn.isBackup()) {
dskRgn.finishInitializeOwner(this, giiStatus);
}
ImageState is = getImageState();
is.lockGII();
// clear the version tag and left-members sets
is.getVersionTags();
is.getLeftMembers();
// remove DESTROYED tokens
RegionVersionVector rvv = is.getClearRegionVersionVector();
try {
Iterator/*<Object>*/ keysIt = getImageState().getDestroyedEntries();
while (keysIt.hasNext()) {
this.entries.removeIfDestroyed(keysIt.next());
}
if (rvv != null) {
// clear any entries received in the GII that are older than the RVV versions.
// this can happen if entry chunks were received prior to the clear() being
// processed
clearEntries(rvv);
}
//need to do this before we release the afterGetInitialImageLatch
if(persistenceAdvisor != null) {
persistenceAdvisor.setOnline(GIIStatus.didGII(giiStatus), false, getPersistentID());
}
}
finally {
// release after gii lock first so basicDestroy will see isInitialized()
// be true
// when they get the cleanUp lock.
try {
releaseAfterGetInitialImageLatch();
} finally { // make sure unlockGII is done for bug 40001
is.unlockGII();
}
}
if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER){
CacheObserverHolder.getInstance().afterMarkingGIICompleted();
}
//"Initializing region {0}" which is not acompanied by a completed message. Users think thread is stuck in some operation. Hence adding this log
logger.info(LocalizedMessage.create(LocalizedStrings.DistributedRegion_INITIALIZING_REGION_COMPLETED_0, this.getName()));
}
/**
* @see LocalRegion#basicDestroy(EntryEventImpl, boolean, Object)
*/
@Override
protected
void basicDestroy(EntryEventImpl event,
boolean cacheWrite,
Object expectedOldValue)
throws EntryNotFoundException, CacheWriterException, TimeoutException {
// disallow local destruction for mirrored keysvalues regions
boolean invokeWriter = cacheWrite;
boolean hasSeen = false;
if (hasSeenEvent(event)) {
hasSeen = true;
}
checkIfReplicatedAndLocalDestroy(event);
try {
if (this.requiresOneHopForMissingEntry(event)) {
// bug #45704: see if a one-hop must be done for this operation
RegionEntry re = getRegionEntry(event.getKey());
if (re == null /*|| re.isTombstone()*/ || !this.generateVersionTag) {
if (this.srp == null) {
// only assert for non-client regions.
Assert.assertTrue(!this.dataPolicy.withReplication() || !this.generateVersionTag);
}
if (!event.isBulkOpInProgress() || this.dataPolicy.withStorage()) {
// removeAll will send a single one-hop for empty regions. for other missing entries
// we need to get a valid version number before modifying the local cache
// TODO: deltaGII: verify that delegating to a peer when this region is also a client is acceptable
boolean didDistribute = RemoteDestroyMessage.distribute(event, expectedOldValue, !this.generateVersionTag);
if (!this.generateVersionTag && !didDistribute) {
throw new PersistentReplicatesOfflineException();
}
if (didDistribute) {
if (logger.isTraceEnabled()) {
logger.trace("Event after remoteDestroy operation: {}", event);
}
invokeWriter = false; // remote cache invoked the writer
if (event.getVersionTag() == null) {
// if the event wasn't applied by the one-hop replicate it will not have a version tag
// and so should not be applied to this cache
return;
}
}
}
}
}
super.basicDestroy(event, invokeWriter, expectedOldValue);
// if this is a destroy coming in from remote source, free up lock resources
// if this is a local origin destroy, this will happen after lock is
// released
if (this.scope.isGlobal() && event.isOriginRemote()) {
try {
getLockService().freeResources(event.getKey());
}
catch (LockServiceDestroyedException ignore) {
}
}
return;
}
finally {
if (hasSeen) {
if (event.isBulkOpInProgress() && !event.isOriginRemote()) {
event.getRemoveAllOperation().addEntry(event, true);
}
distributeDestroy(event, expectedOldValue);
event.invokeCallbacks(this,true, false);
}
}
}
@Override
void basicDestroyPart3(RegionEntry re, EntryEventImpl event,
boolean inTokenMode, boolean duringRI, boolean invokeCallbacks, Object expectedOldValue) {
distributeDestroy(event, expectedOldValue);
super.basicDestroyPart3(re, event, inTokenMode, duringRI, invokeCallbacks, expectedOldValue);
}
void distributeDestroy(EntryEventImpl event, Object expectedOldValue) {
if (event.isDistributed() && !event.isOriginRemote() && !event.isBulkOpInProgress()) {
boolean distribute = !event.getInhibitDistribution();
if (distribute) {
DestroyOperation op = new DestroyOperation(event);
op.distribute();
}
}
}
@Override
boolean evictDestroy(LRUEntry entry) {
boolean evictDestroyWasDone = super.evictDestroy(entry);
if (evictDestroyWasDone) {
if (this.scope.isGlobal()) {
try {
getLockService().freeResources(entry.getKey());
}
catch (LockServiceDestroyedException ignore) {
}
}
}
return evictDestroyWasDone;
}
/**
* @see LocalRegion#basicInvalidateRegion(RegionEventImpl)
*/
@Override
void basicInvalidateRegion(RegionEventImpl event)
{
// disallow local invalidation for replicated regions
if (!event.isDistributed() && getScope().isDistributed()
&& getDataPolicy().withReplication()) {
throw new IllegalStateException(LocalizedStrings.DistributedRegion_NOT_ALLOWED_TO_DO_A_LOCAL_INVALIDATION_ON_A_REPLICATED_REGION.toLocalizedString());
}
if (shouldDistributeInvalidateRegion(event)) {
distributeInvalidateRegion(event);
}
super.basicInvalidateRegion(event);
}
/**
* decide if InvalidateRegionOperation should be sent to peers. broken out so
* that BucketRegion can override
* @param event
* @return true if {@link InvalidateRegionOperation} should be distributed, false otherwise
*/
protected boolean shouldDistributeInvalidateRegion(RegionEventImpl event) {
return event.isDistributed() && !event.isOriginRemote();
}
/**
* Distribute the invalidate of a region given its event.
* This implementation sends the invalidate to peers.
* @since 5.7
*/
protected void distributeInvalidateRegion(RegionEventImpl event) {
new InvalidateRegionOperation(event).distribute();
}
/**
* @see LocalRegion#basicDestroyRegion(RegionEventImpl, boolean, boolean,
* boolean)
*/
@Override
void basicDestroyRegion(RegionEventImpl event, boolean cacheWrite,
boolean lock, boolean callbackEvents) throws CacheWriterException,
TimeoutException
{
final String path = getFullPath();
//Keep track of regions that are being destroyed. This helps avoid a race
//when another member concurrently creates this region. See bug 42051.
boolean isClose = event.getOperation().isClose();
if(!isClose) {
cache.beginDestroy(path, this);
}
try {
super.basicDestroyRegion(event, cacheWrite, lock, callbackEvents);
// send destroy region operation even if this is a localDestroyRegion (or
// close)
if (!event.isOriginRemote()) {
distributeDestroyRegion(event, true);
} else {
if(!event.isReinitializing()) {
RegionEventImpl localEvent = new RegionEventImpl(this,
Operation.REGION_LOCAL_DESTROY, event.getCallbackArgument(), false, getMyId(),
generateEventID()/* generate EventID */);
distributeDestroyRegion(localEvent, false/*fixes bug 41111*/);
}
}
notifyBridgeClients(event);
}
catch (CancelException e) {
if (logger.isDebugEnabled()) {
logger.debug("basicDestroyRegion short-circuited due to cancellation");
}
}
finally {
if(!isClose) {
cache.endDestroy(path, this);
}
RegionLogger.logDestroy(path, getMyId(), getPersistentID(), isClose);
}
}
@Override
protected void distributeDestroyRegion(RegionEventImpl event,
boolean notifyOfRegionDeparture) {
if(persistenceAdvisor != null) {
persistenceAdvisor.releaseTieLock();
}
new DestroyRegionOperation(event, notifyOfRegionDeparture).distribute();
}
/**
* Return true if invalidation occurred; false if it did not, for example if
* it was already invalidated
*
* @see LocalRegion#basicInvalidate(EntryEventImpl)
*/
@Override
void basicInvalidate(EntryEventImpl event) throws EntryNotFoundException
{
boolean hasSeen = false;
if (hasSeenEvent(event)) {
hasSeen = true;
}
try {
// disallow local invalidation for replicated regions
if (event.isLocalInvalid() && !event.getOperation().isLocal() && getScope().isDistributed()
&& getDataPolicy().withReplication()) {
throw new IllegalStateException(LocalizedStrings.DistributedRegion_NOT_ALLOWED_TO_DO_A_LOCAL_INVALIDATION_ON_A_REPLICATED_REGION.toLocalizedString());
}
if (this.requiresOneHopForMissingEntry(event)) {
// bug #45704: see if a one-hop must be done for this operation
RegionEntry re = getRegionEntry(event.getKey());
if (re == null/* || re.isTombstone()*/ || !this.generateVersionTag) {
if (this.srp == null) {
// only assert for non-client regions.
Assert.assertTrue(!this.dataPolicy.withReplication() || !this.generateVersionTag);
}
// TODO: deltaGII: verify that delegating to a peer when this region is also a client is acceptable
boolean didDistribute = RemoteInvalidateMessage.distribute(event, !this.generateVersionTag);
if (!this.generateVersionTag && !didDistribute) {
throw new PersistentReplicatesOfflineException();
}
if (didDistribute) {
if (logger.isDebugEnabled()) {
logger.debug("Event after remoteInvalidate operation: {}", event);
}
if (event.getVersionTag() == null) {
// if the event wasn't applied by the one-hop replicate it will not have a version tag
// and so should not be applied to this cache
return;
}
}
}
}
super.basicInvalidate(event);
return;
} finally {
if (hasSeen) {
distributeInvalidate(event);
event.invokeCallbacks(this,true, false);
}
}
}
@Override
void basicInvalidatePart3(RegionEntry re, EntryEventImpl event,
boolean invokeCallbacks) {
distributeInvalidate(event);
super.basicInvalidatePart3(re, event, invokeCallbacks);
}
void distributeInvalidate(EntryEventImpl event) {
if (!this.regionInvalid && event.isDistributed() && !event.isOriginRemote()
&& !isTX() /* only distribute if non-tx */) {
if (event.isDistributed() && !event.isOriginRemote()) {
boolean distribute = !event.getInhibitDistribution();
if (distribute) {
InvalidateOperation op = new InvalidateOperation(event);
op.distribute();
}
}
}
}
@Override
void basicUpdateEntryVersion(EntryEventImpl event)
throws EntryNotFoundException {
try {
if (!hasSeenEvent(event)) {
super.basicUpdateEntryVersion(event);
}
return;
} finally {
distributeUpdateEntryVersion(event);
}
}
private void distributeUpdateEntryVersion(EntryEventImpl event) {
if (!this.regionInvalid && event.isDistributed() && !event.isOriginRemote()
&& !isTX() /* only distribute if non-tx */) {
if (event.isDistributed() && !event.isOriginRemote()) {
UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event);
op.distribute();
}
}
}
@Override
protected void basicClear(RegionEventImpl ev)
{
Lock dlock = this.getRegionDistributedLockIfGlobal();
try {
super.basicClear(ev);
}
finally {
if (dlock != null)
dlock.unlock();
}
}
@Override
void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
if (this.concurrencyChecksEnabled && !this.dataPolicy.withReplication()) {
boolean retry = false;
do {
// non-replicate regions must defer to a replicate for clear/invalidate of region
Set<InternalDistributedMember> repls = this.distAdvisor.adviseReplicates();
if (repls.size() > 0) {
InternalDistributedMember mbr = repls.iterator().next();
RemoteRegionOperation op = RemoteRegionOperation.clear(mbr, this);
try {
op.distribute();
return;
} catch (CancelException e) {
this.stopper.checkCancelInProgress(e);
retry = true;
} catch (RemoteOperationException e) {
this.stopper.checkCancelInProgress(e);
retry = true;
}
}
} while (retry);
}
// if no version vector or if no replicates are around, use the default mechanism
super.basicClear(regionEvent, cacheWrite);
}
@Override
void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
boolean enableRVV = useRVV && this.dataPolicy.withReplication() && this.concurrencyChecksEnabled && !getDistributionManager().isLoner();
//Fix for 46338 - apparently multiple threads from the same VM are allowed
//to suspend locking, which is what distributedLockForClear() does. We don't
//want that to happen, so we'll synchronize to make sure only one thread on
//this member performs a clear.
synchronized(clearLock) {
if (enableRVV) {
distributedLockForClear();
try {
Set<InternalDistributedMember> participants = getCacheDistributionAdvisor().adviseInvalidateRegion();
// pause all generation of versions and flush from the other members to this one
try {
obtainWriteLocksForClear(regionEvent, participants);
clearRegionLocally(regionEvent, cacheWrite, null);
if (!regionEvent.isOriginRemote() && regionEvent.isDistributed()) {
DistributedClearOperation.clear(regionEvent, null, participants);
}
} finally {
releaseWriteLocksForClear(regionEvent, participants);
}
}
finally {
distributedUnlockForClear();
}
} else {
Set<InternalDistributedMember> participants = getCacheDistributionAdvisor().adviseInvalidateRegion();
clearRegionLocally(regionEvent, cacheWrite, null);
if (!regionEvent.isOriginRemote() && regionEvent.isDistributed()) {
DistributedClearOperation.clear(regionEvent, null, participants);
}
}
}
// since clients do not maintain RVVs except for tombstone GC
// we need to ensure that current ops reach the client queues
// before queuing a clear, but there is no infrastructure for doing so
notifyBridgeClients(regionEvent);
}
/**
* Obtain a distributed lock for the clear operation.
*/
private void distributedLockForClear() {
if (!this.scope.isGlobal()) { // non-global regions must lock when using RVV
try {
getLockService().lock("_clearOperation", -1, -1);
} catch(IllegalStateException e) {
lockCheckReadiness();
throw e;
}
}
}
/**
* Release the distributed lock for the clear operation.
*/
private void distributedUnlockForClear() {
if (!this.scope.isGlobal()) {
try {
getLockService().unlock("_clearOperation");
} catch(IllegalStateException e) {
lockCheckReadiness();
throw e;
}
}
}
/** obtain locks preventing generation of new versions in other members
* @param participants
**/
private void obtainWriteLocksForClear(RegionEventImpl regionEvent, Set<InternalDistributedMember> participants) {
lockLocallyForClear(getDistributionManager(), getMyId());
DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants);
}
/** pause local operations so that a clear() can be performed and flush comm channels to the given member
*/
public void lockLocallyForClear(DM dm, InternalDistributedMember locker) {
RegionVersionVector rvv = getVersionVector();
if (rvv != null) {
// block new operations from being applied to the region map
rvv.lockForClear(getFullPath(), dm, locker);
//Check for region destroyed after we have locked, to make sure
//we don't continue a clear if the region has been destroyed.
checkReadiness();
// wait for current operations to
if (!locker.equals(dm.getDistributionManagerId())) {
Set<InternalDistributedMember> mbrs = getDistributionAdvisor().adviseCacheOp();
StateFlushOperation.flushTo(mbrs, this);
}
}
}
/** releases the locks obtained in obtainWriteLocksForClear
* @param participants */
private void releaseWriteLocksForClear(RegionEventImpl regionEvent, Set<InternalDistributedMember> participants) {
getVersionVector().unlockForClear(getMyId());
DistributedClearOperation.releaseLocks(regionEvent, participants);
}
/**
* Wait for in progress clears that were initiated by this member.
*/
private void waitForInProgressClear() {
RegionVersionVector rvv = getVersionVector();
if (rvv != null) {
synchronized(clearLock) {
//do nothing;
//DAN - I'm a little scared that the compiler might optimize
//away this synchronization if we really do nothing. Hence
//my fine log message below. This might not be necessary.
if (logger.isDebugEnabled()) {
logger.debug("Done waiting for clear");
}
}
}
}
/**
* Distribute Tombstone garbage-collection information to all peers with storage
*/
protected EventID distributeTombstoneGC(Set<Object> keysRemoved) {
this.getCachePerfStats().incTombstoneGCCount();
EventID eventId = new EventID(getSystem());
DistributedTombstoneOperation gc = DistributedTombstoneOperation.gc(this, eventId);
gc.distribute();
notifyClientsOfTombstoneGC(getVersionVector().getTombstoneGCVector(), keysRemoved, eventId, null);
return eventId;
}
// test hook for DistributedAckRegionCCEDUnitTest
public static boolean LOCALCLEAR_TESTHOOK;
@Override
void basicLocalClear(RegionEventImpl rEvent) {
if (getScope().isDistributed() && getDataPolicy().withReplication() && !LOCALCLEAR_TESTHOOK) {
throw new UnsupportedOperationException(LocalizedStrings.DistributedRegion_LOCALCLEAR_IS_NOT_SUPPORTED_ON_DISTRIBUTED_REPLICATED_REGIONS.toLocalizedString());
}
super.basicLocalClear(rEvent);
}
public final DistributionConfig getDistributionConfig() {
return getSystem().getDistributionManager().getConfig();
}
/**
* Sends a list of queued messages to members playing a specified role
*
* @param list
* List of QueuedOperation instances to send. Any messages sent will
* be removed from this list
* @param role
* the role that a recipient must be playing
* @return true if at least one message made it to at least one guy playing
* the role
*/
boolean sendQueue(List list, Role role) {
SendQueueOperation op = new SendQueueOperation(getDistributionManager(),
this, list, role);
return op.distribute();
}
/*
* @see SearchLoadAndWriteProcessor#initialize(LocalRegion, Object, Object)
*/
public final CacheDistributionAdvisor getDistributionAdvisor()
{
return this.distAdvisor;
}
public final CacheDistributionAdvisor getCacheDistributionAdvisor()
{
return this.distAdvisor;
}
public final PersistenceAdvisor getPersistenceAdvisor() {
return this.persistenceAdvisor;
}
public final PersistentMemberID getPersistentID() {
return this.persistentId;
}
/** Returns the distribution profile; lazily creates one if needed */
public Profile getProfile() {
return this.distAdvisor.createProfile();
}
public void fillInProfile(Profile p) {
assert p instanceof CacheProfile;
CacheProfile profile = (CacheProfile)p;
profile.dataPolicy = getDataPolicy();
profile.hasCacheLoader = basicGetLoader() != null;
profile.hasCacheWriter = basicGetWriter() != null;
profile.hasCacheListener = hasListener();
Assert.assertTrue(this.scope.isDistributed());
profile.scope = this.scope;
profile.inRecovery = getImageState().getInRecovery();
profile.isPersistent = getDataPolicy().withPersistence();
profile.setSubscriptionAttributes(getSubscriptionAttributes());
// Kishor : Below PDX check is added for rolling upgrade support. We are
// removing Old wan in this checkin. PDX region are always gatewayEnabled
// irrespective whether gatewayHub is configured or not.
// Old version Pdx region always has this attribute true so to avoid region
// attribute comparison across member we are setting it to true.
if (this.isPdxTypesRegion()) {
profile.isGatewayEnabled = true;
}
else {
profile.isGatewayEnabled = false;
}
profile.serialNumber = getSerialNumber();
profile.regionInitialized = this.isInitialized();
if (!this.isUsedForPartitionedRegionBucket()) {
profile.memberUnInitialized = getCache().isUnInitializedMember(
profile.getDistributedMember());
}
else {
profile.memberUnInitialized = false;
}
profile.persistentID = getPersistentID();
if(getPersistenceAdvisor() != null) {
profile.persistenceInitialized = getPersistenceAdvisor().isOnline();
}
profile.hasCacheServer = ((this.cache.getCacheServers().size() > 0)?true:false);
profile.requiresOldValueInEvents = this.dataPolicy.withReplication() &&
this.filterProfile != null && this.filterProfile.hasCQs();
profile.gatewaySenderIds = getGatewaySenderIds();
profile.asyncEventQueueIds = getAsyncEventQueueIds();
profile.isOffHeap = getOffHeap();
}
/**
* Return the DistributedLockService associated with this Region. This method
* will lazily create that service the first time it is invoked on this
* region.
*/
public DistributedLockService getLockService()
{
synchronized (this.dlockMonitor) {
// Assert.assertTrue(this.scope.isGlobal()); since 7.0 this is used for distributing clear() ops
String svcName = getFullPath();
if (this.dlockService == null) {
this.dlockService = DistributedLockService.getServiceNamed(svcName);
if (this.dlockService == null) {
this.dlockService = DLockService.create(
getFullPath(),
getSystem(),
true /*distributed*/,
false /*destroyOnDisconnect*/, // region destroy will destroy dls
false /*automateFreeResources*/); // manual freeResources only
}
// handle is-lock-grantor region attribute...
if (this.isLockGrantor) {
this.dlockService.becomeLockGrantor();
}
if (logger.isDebugEnabled()) {
logger.debug("LockService for {} is using LockLease={}, LockTimeout=", svcName, getCache().getLockLease(), getCache().getLockTimeout());
}
}
return this.dlockService;
}
}
/**
* @see LocalRegion#isCurrentlyLockGrantor()
*/
@Override
protected boolean isCurrentlyLockGrantor()
{
if (!this.scope.isGlobal())
return false;
return getLockService().isLockGrantor();
}
@Override
public boolean isLockGrantor()
{
if (!this.scope.isGlobal())
return false;
return this.isLockGrantor;
}
@Override
public void becomeLockGrantor()
{
checkReadiness();
checkForLimitedOrNoAccess();
if (!this.scope.isGlobal()) {
throw new IllegalStateException(LocalizedStrings.DistributedRegion_DISTRIBUTION_LOCKS_ARE_ONLY_SUPPORTED_FOR_REGIONS_WITH_GLOBAL_SCOPE_NOT_0.toLocalizedString(this.scope));
}
DistributedLockService svc = getLockService();
try {
super.becomeLockGrantor();
if (!svc.isLockGrantor()) {
svc.becomeLockGrantor();
}
}
finally {
if (!svc.isLockGrantor()) {
if (logger.isDebugEnabled()) {
logger.debug("isLockGrantor is false after becomeLockGrantor for {}", getFullPath());
}
}
}
}
/** @return the deserialized value */
@Override
@Retained
protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
TXStateInterface txState, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead,
boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
throws CacheLoaderException, TimeoutException
{
checkForLimitedOrNoAccess();
RegionEntry re = null;
final Object key = keyInfo.getKey();
final Object aCallbackArgument = keyInfo.getCallbackArg();
Operation op;
if (isCreate) {
op = Operation.CREATE;
}
else {
op = Operation.UPDATE;
}
long lastModified = 0L;
boolean fromServer = false;
EntryEventImpl event = null;
@Retained Object result = null;
boolean incrementUseCountForSqlf = false;
try {
{
if (this.srp != null) {
EntryEventImpl holder = EntryEventImpl.createVersionTagHolder();
try {
Object value = this.srp.get(key, aCallbackArgument, holder);
fromServer = value != null;
if (fromServer) {
event = EntryEventImpl.create(this, op, key, value,
aCallbackArgument, false,
getMyId(), generateCallbacks);
event.setVersionTag(holder.getVersionTag());
event.setFromServer(fromServer); // fix for bug 39358
if (clientEvent != null && clientEvent.getVersionTag() == null) {
clientEvent.setVersionTag(holder.getVersionTag());
}
}
} finally {
holder.release();
}
}
}
if (!fromServer) {
//Do not generate Event ID
event = EntryEventImpl.create(this, op, key, null /*newValue*/,
aCallbackArgument, false,
getMyId(), generateCallbacks);
if (requestingClient != null) {
event.setContext(requestingClient);
}
SearchLoadAndWriteProcessor processor =
SearchLoadAndWriteProcessor.getProcessor();
try {
processor.initialize(this, key, aCallbackArgument);
// processor fills in event
processor.doSearchAndLoad(event, txState, localValue);
if (clientEvent != null && clientEvent.getVersionTag() == null) {
clientEvent.setVersionTag(event.getVersionTag());
}
lastModified = processor.getLastModified();
}
finally {
processor.release();
}
}
if (event.hasNewValue() && !isMemoryThresholdReachedForLoad()) {
try {
// Set eventId. Required for interested clients.
event.setNewEventId(cache.getDistributedSystem());
long startPut = CachePerfStats.getStatTime();
validateKey(key);
// if (event.getOperation().isLoad()) {
// this.performedLoad(event, lastModified, txState);
// }
// this next step also distributes the object to other processes, if necessary
try {
// set the tail key so that the event is passed to GatewaySender queues.
// if the tailKey is not set, the event gets filtered out in ParallelGatewaySenderQueue
if (this instanceof BucketRegion) {
if (((BucketRegion)this).getPartitionedRegion().isParallelWanEnabled())
((BucketRegion)this).handleWANEvent(event);
}
re = basicPutEntry(event, lastModified);
incrementUseCountForSqlf = GemFireCacheImpl.sqlfSystem() ;
} catch (ConcurrentCacheModificationException e) {
// the cache was modified while we were searching for this entry and
// the netsearch result was elided. Return the current value from the cache
re = getRegionEntry(key);
if (re != null) {
event.setNewValue(re.getValue(this)); // OFFHEAP: need to incrc, copy to heap to setNewValue, decrc
}
}
if (!isTX()) {
getCachePerfStats().endPut(startPut, event.isOriginRemote());
}
}
catch (CacheWriterException cwe) {
if (logger.isDebugEnabled()) {
logger.debug("findObjectInSystem: writer exception putting entry {} : {}", event, cwe);
}
}
}
if (isCreate) {
recordMiss(re, key);
}
if (preferCD) {
if (event.hasDelta()) {
result = event.getNewValue();
} else {
result = event.getRawNewValueAsHeapObject();
}
} else {
result = event.getNewValue();
}
//For SQLFire , we need to increment the use count so that returned
//object has use count 2
if( incrementUseCountForSqlf && result instanceof Chunk) {
((Chunk)result).retain();
}
return result;
} finally {
if (event != null) {
event.release();
}
}
}
protected ConcurrentParallelGatewaySenderQueue getHDFSQueue() {
if (this.hdfsQueue == null) {
String asyncQId = this.getPartitionedRegion().getHDFSEventQueueName();
final AsyncEventQueueImpl asyncQ = (AsyncEventQueueImpl)this.getCache().getAsyncEventQueue(asyncQId);
final AbstractGatewaySender gatewaySender = (AbstractGatewaySender)asyncQ.getSender();
AbstractGatewaySenderEventProcessor ep = gatewaySender.getEventProcessor();
if (ep == null) return null;
hdfsQueue = (ConcurrentParallelGatewaySenderQueue)ep.getQueue();
}
return hdfsQueue;
}
/** hook for subclasses to note that a cache load was performed
* @see BucketRegion#performedLoad
*/
// void performedLoad(EntryEventImpl event, long lastModifiedTime, TXState txState)
// throws CacheWriterException {
// // no action in DistributedRegion
// }
/**
* @see LocalRegion#cacheWriteBeforeDestroy(EntryEventImpl, Object)
* @return true if cacheWrite was performed
*/
@Override
boolean cacheWriteBeforeDestroy(EntryEventImpl event, Object expectedOldValue)
throws CacheWriterException, EntryNotFoundException, TimeoutException
{
boolean result = false;
if (event.isDistributed()) {
CacheWriter localWriter = basicGetWriter();
Set netWriteRecipients = localWriter == null ? this.distAdvisor
.adviseNetWrite() : null;
if ((localWriter != null
|| (netWriteRecipients != null && !netWriteRecipients.isEmpty())) &&
!event.inhibitAllNotifications()) {
final long start = getCachePerfStats().startCacheWriterCall();
try {
event.setOldValueFromRegion();
SearchLoadAndWriteProcessor processor =
SearchLoadAndWriteProcessor.getProcessor();
try {
processor.initialize(this, event.getKey(), null);
processor.doNetWrite(event, netWriteRecipients, localWriter,
SearchLoadAndWriteProcessor.BEFOREDESTROY);
result = true;
}
finally {
processor.release();
}
}
finally {
getCachePerfStats().endCacheWriterCall(start);
}
}
serverDestroy(event, expectedOldValue);
}
return result;
}
/**
* @see LocalRegion#cacheWriteBeforeRegionDestroy(RegionEventImpl)
*/
@Override
boolean cacheWriteBeforeRegionDestroy(RegionEventImpl event)
throws CacheWriterException, TimeoutException
{
boolean result = false;
if (event.getOperation().isDistributed()) {
CacheWriter localWriter = basicGetWriter();
Set netWriteRecipients = localWriter == null ? this.distAdvisor
.adviseNetWrite() : null;
if (localWriter != null
|| (netWriteRecipients != null && !netWriteRecipients.isEmpty())) {
final long start = getCachePerfStats().startCacheWriterCall();
try {
SearchLoadAndWriteProcessor processor =
SearchLoadAndWriteProcessor.getProcessor();
try {
processor.initialize(this, "preDestroyRegion", null);
processor.doNetWrite(event, netWriteRecipients, localWriter,
SearchLoadAndWriteProcessor.BEFOREREGIONDESTROY);
result = true;
}
finally {
processor.release();
}
}
finally {
getCachePerfStats().endCacheWriterCall(start);
}
}
serverRegionDestroy(event);
}
return result;
}
protected void distributedRegionCleanup(RegionEventImpl event)
{
if (event == null || event.getOperation() != Operation.REGION_REINITIALIZE) {
// only perform this if reinitialize is not due to resumption
// (REGION_REINITIALIZE)
// or if event is null then this was a failed initialize (create)
// wake up any threads in waitForRequiredRoles... they will checkReadiness
synchronized (this.missingRequiredRoles) {
this.missingRequiredRoles.notifyAll();
}
}
if(persistenceAdvisor != null) {
this.persistenceAdvisor.close(); // fix for bug 41094
}
this.distAdvisor.close();
DLockService dls = null;
//Fix for bug 46338. Wait for in progress clears before destroying the
//lock service, because destroying the service immediately releases the dlock
waitForInProgressClear();
synchronized (this.dlockMonitor) {
if (this.dlockService != null) {
dls = (DLockService)this.dlockService;
}
}
if (dls != null) {
try {
dls.destroyAndRemove();
}
catch (CancelException e) {
// bug 37118
if (logger.isDebugEnabled()) {
logger.debug("DLS destroy abridged due to shutdown", e);
}
}
catch (Exception ex) {
logger.warn(LocalizedMessage.create(LocalizedStrings.DistributedRegion_DLS_DESTROY_MAY_HAVE_FAILED_FOR_0, this.getFullPath()), ex);
}
}
if (this.rmq != null) {
this.rmq.close();
}
//Fix for #48066 - make sure that region operations are completely
//distributed to peers before destroying the region.
long timeout = 1000L * getCache().getDistributedSystem().getConfig().getAckWaitThreshold();
Boolean flushOnClose = !Boolean.getBoolean("gemfire.no-flush-on-close"); // test hook
if (!this.cache.forcedDisconnect() &&
flushOnClose && this.getDistributionManager().getMembershipManager() != null
&& this.getDistributionManager().getMembershipManager().isConnected()) {
getDistributionAdvisor().forceNewMembershipVersion();
try {
getDistributionAdvisor().waitForCurrentOperations(timeout);
} catch (Exception e) {
// log this but try to close the region so that listeners are invoked
logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCache_0_ERROR_CLOSING_REGION_1,
new Object[] { this, getFullPath() }), e);
}
}
}
/**
* In addition to inherited code this method also invokes
* RegionMembershipListeners
*/
@Override
protected void postCreateRegion()
{
super.postCreateRegion();
// should we sync on this.distAdvisor first to prevent bug 44369?
synchronized (this.advisorListener) {
Set others = this.advisorListener.getInitialMembers();
CacheListener[] listeners = fetchCacheListenersField();
if (listeners != null) {
for (int i = 0; i < listeners.length; i++) {
if (listeners[i] instanceof RegionMembershipListener) {
RegionMembershipListener rml = (RegionMembershipListener)listeners[i];
try {
DistributedMember[] otherDms = new DistributedMember[others
.size()];
others.toArray(otherDms);
rml.initialMembers(this, otherDms);
}
catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.error(LocalizedMessage.create(LocalizedStrings.DistributedRegion_EXCEPTION_OCCURRED_IN_REGIONMEMBERSHIPLISTENER), t);
}
}
}
}
Set<String> allGatewaySenderIds = getAllGatewaySenderIds();
if (!allGatewaySenderIds.isEmpty()) {
for (GatewaySender sender : cache.getAllGatewaySenders()) {
if (sender.isParallel()
&& allGatewaySenderIds.contains(sender.getId())) {
//Fix for Bug#51491. Once decided to support this configuration we have call addShadowPartitionedRegionForUserRR
if (sender.getId().contains(
AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX)) {
throw new AsyncEventQueueConfigurationException(
LocalizedStrings.ParallelAsyncEventQueue_0_CAN_NOT_BE_USED_WITH_REPLICATED_REGION_1.toLocalizedString(new Object[] {
AsyncEventQueueImpl
.getAsyncEventQueueIdFromSenderId(sender.getId()),
this.getFullPath() }));
}
throw new GatewaySenderConfigurationException(
LocalizedStrings.ParallelGatewaySender_0_CAN_NOT_BE_USED_WITH_REPLICATED_REGION_1
.toLocalizedString(new Object[] { sender.getId(),
this.getFullPath() }));
// if (sender.isRunning()) {
// ConcurrentParallelGatewaySenderQueue parallelQueue =
// (ConcurrentParallelGatewaySenderQueue)((ParallelGatewaySenderImpl)sender)
// .getQueues().toArray(new RegionQueue[1])[0];
// parallelQueue.addShadowPartitionedRegionForUserRR(this);
// }
}
}
}
}
}
/**
* Free resources held by this region. This method is invoked after
* isDestroyed has been set to true.
*
* @see LocalRegion#postDestroyRegion(boolean, RegionEventImpl)
*/
@Override
protected void postDestroyRegion(boolean destroyDiskRegion,
RegionEventImpl event)
{
distributedRegionCleanup(event);
try {
super.postDestroyRegion(destroyDiskRegion, event);
}
catch (CancelException e) {
// I don't think this should ever happens: bulletproofing for bug 39454
logger.warn("postDestroyRegion: encountered cancellation", e);
}
if (this.rmq != null && destroyDiskRegion) {
this.rmq.destroy();
}
}
@Override
void cleanupFailedInitialization()
{
super.cleanupFailedInitialization();
try {
RegionEventImpl ev = new RegionEventImpl(this, Operation.REGION_CLOSE, null, false, getMyId(),
generateEventID());
distributeDestroyRegion(ev, true);
distributedRegionCleanup(null);
} catch(RegionDestroyedException e) {
//someone else must have concurrently destroyed the region (maybe a distributed destroy)
} catch(CancelException e) {
//cache or DS is closed, ignore
} catch(VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch(Throwable t) {
logger.warn(LocalizedMessage.create(LocalizedStrings.DistributedRegion_ERROR_CLEANING_UP_FAILED_INITIALIZATION, this), t);
}
}
/**
* @see LocalRegion#handleCacheClose(Operation)
*/
@Override
void handleCacheClose(Operation op)
{
try {
super.handleCacheClose(op);
}
finally {
distributedRegionCleanup(null);
}
}
/**
* invoke a cache writer before a put is performed elsewhere
*
* @see LocalRegion#cacheWriteBeforePut(EntryEventImpl, Set, CacheWriter, boolean, Object)
*/
@Override
protected void cacheWriteBeforePut(EntryEventImpl event, Set netWriteRecipients,
CacheWriter localWriter,
boolean requireOldValue,
Object expectedOldValue)
throws CacheWriterException, TimeoutException
{
if ((localWriter != null
|| (netWriteRecipients != null && !netWriteRecipients.isEmpty())) &&
!event.inhibitAllNotifications()) {
final boolean isNewKey = event.getOperation().isCreate();
final long start = getCachePerfStats().startCacheWriterCall();
try {
SearchLoadAndWriteProcessor processor =
SearchLoadAndWriteProcessor.getProcessor();
processor.initialize(this, "preUpdate", null);
try {
if (!isNewKey) {
processor.doNetWrite(event, netWriteRecipients, localWriter,
SearchLoadAndWriteProcessor.BEFOREUPDATE);
}
else {
processor.doNetWrite(event, netWriteRecipients, localWriter,
SearchLoadAndWriteProcessor.BEFORECREATE);
}
}
finally {
processor.release();
}
}
finally {
getCachePerfStats().endCacheWriterCall(start);
}
}
serverPut(event, requireOldValue, expectedOldValue);
}
@Override
protected void cacheListenersChanged(boolean nowHasListener)
{
if (nowHasListener) {
this.advisorListener.initRMLWrappers();
}
new UpdateAttributesProcessor(this).distribute();
}
@Override
protected void cacheWriterChanged(CacheWriter oldWriter)
{
super.cacheWriterChanged(oldWriter);
if (isBridgeWriter(oldWriter)) {
oldWriter = null;
}
if (oldWriter == null ^ basicGetWriter() == null) {
new UpdateAttributesProcessor(this).distribute();
}
}
@Override
protected void cacheLoaderChanged(CacheLoader oldLoader)
{
super.cacheLoaderChanged(oldLoader);
if (isBridgeLoader(oldLoader)) {
oldLoader = null;
}
if (oldLoader == null ^ basicGetLoader() == null) {
new UpdateAttributesProcessor(this).distribute();
}
}
public void addGatewaySenderId(String gatewaySenderId) {
super.addGatewaySenderId(gatewaySenderId);
new UpdateAttributesProcessor(this).distribute();
}
public void removeGatewaySenderId(String gatewaySenderId) {
super.removeGatewaySenderId(gatewaySenderId);
new UpdateAttributesProcessor(this).distribute();
}
public void addAsyncEventQueueId(String asyncEventQueueId) {
super.addAsyncEventQueueId(asyncEventQueueId);
new UpdateAttributesProcessor(this).distribute();
}
public void removeAsyncEventQueueId(String asyncEventQueueId) {
super.removeAsyncEventQueueId(asyncEventQueueId);
new UpdateAttributesProcessor(this).distribute();
}
public void checkSameSenderIdsAvailableOnAllNodes() {
List senderIds = this.getCacheDistributionAdvisor()
.adviseSameGatewaySenderIds(getGatewaySenderIds());
if (!senderIds.isEmpty()) {
throw new GatewaySenderConfigurationException(
LocalizedStrings.Region_REGION_0_HAS_1_GATEWAY_SENDER_IDS_ANOTHER_CACHE_HAS_THE_SAME_REGION_WITH_2_GATEWAY_SENDER_IDS_FOR_REGION_ACROSS_ALL_MEMBERS_IN_DS_GATEWAY_SENDER_IDS_SHOULD_BE_SAME
.toLocalizedString(new Object[] { this.getName(),
senderIds.get(0), senderIds.get(1) }));
}
List asycnQueueIds = this.getCacheDistributionAdvisor()
.adviseSameAsyncEventQueueIds(getAsyncEventQueueIds());
if (!asycnQueueIds.isEmpty()) {
throw new GatewaySenderConfigurationException(
LocalizedStrings.Region_REGION_0_HAS_1_ASYNC_EVENT_QUEUE_IDS_ANOTHER_CACHE_HAS_THE_SAME_REGION_WITH_2_ASYNC_EVENT_QUEUE_IDS_FOR_REGION_ACROSS_ALL_MEMBERS_IN_DS_ASYNC_EVENT_QUEUE_IDS_SHOULD_BE_SAME
.toLocalizedString(new Object[] { this.getName(),
asycnQueueIds.get(0), asycnQueueIds.get(1) }));
}
}
/**
* Wraps call to dlock service in order to throw RegionDestroyedException if
* dlock service throws IllegalStateException and isDestroyed is true.
*/
private boolean isLockingSuspendedByCurrentThread()
{
try {
return getLockService().isLockingSuspendedByCurrentThread();
}
catch (IllegalStateException e) {
lockCheckReadiness();
throw e;
}
}
/**
* If this region's scope is GLOBAL, get a distributed lock on the given key,
* and return the Lock. The sender is responsible for unlocking.
*
* @return the acquired Lock if the region is GLOBAL, otherwise null.
*
* @throws NullPointerException
* if key is null
*/
private Lock getDistributedLockIfGlobal(Object key) throws TimeoutException
{
if (getScope().isGlobal()) {
if (isLockingSuspendedByCurrentThread())
return null;
long start = System.currentTimeMillis();
long timeLeft = getCache().getLockTimeout();
long lockTimeout = timeLeft;
StringId msg = null;
Object[] msgArgs = null;
while (timeLeft > 0 || lockTimeout == -1) {
this.cache.getCancelCriterion().checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
Lock dlock = getDistributedLock(key);
if (!dlock.tryLock(timeLeft, TimeUnit.SECONDS)) {
msg = LocalizedStrings.DistributedRegion_ATTEMPT_TO_ACQUIRE_DISTRIBUTED_LOCK_FOR_0_FAILED_AFTER_WAITING_1_SECONDS;
msgArgs = new Object[] {key, Long.valueOf((System.currentTimeMillis() - start) / 1000L)};
break;
}
return dlock;
}
catch (InterruptedException ex) {
interrupted = true;
this.cache.getCancelCriterion().checkCancelInProgress(ex);
// FIXME Why is it OK to keep going?
if (lockTimeout > -1) {
timeLeft = getCache().getLockTimeout()
- ((System.currentTimeMillis() - start) / 1000L);
}
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // while
if (msg == null) {
msg = LocalizedStrings.DistributedRegion_TIMED_OUT_AFTER_WAITING_0_SECONDS_FOR_THE_DISTRIBUTED_LOCK_FOR_1;
msgArgs = new Object[] {Integer.valueOf(getCache().getLockTimeout()), key};
}
throw new TimeoutException(msg.toLocalizedString(msgArgs));
}
else {
return null;
}
}
/**
* Checks if the entry is a valid entry
*
* @return true if entry not null or entry is not removed
*
*/
protected boolean checkEntryNotValid(RegionEntry mapEntry)
{
return (mapEntry == null || (mapEntry.isRemoved() && !mapEntry.isTombstone()));
}
/**
* Get the best iterator for iterating over the contents of this
* region. This method will either an iterator that uses hash
* ordering from the entry map, or, in the case of an overflow
* region, an iterator that iterates over the entries in disk order.
*/
public Iterator<RegionEntry> getBestIterator(boolean includeValues) {
DiskRegion dr = this.getDiskRegion();
if (DiskPage.DISK_PAGE_SIZE > 0 && includeValues && dr != null) {
//Wait for the disk region to recover values first.
dr.waitForAsyncRecovery();
if(dr.getNumOverflowOnDisk() > 0) {
return new DiskSavyIterator();
}
}
return this.entries.regionEntries().iterator();
}
// /**
// * The maximum number of entries that can be put into the diskMap before
// * some of them are read from disk and returned by this iterator.
// * The larger this number the more memory this iterator is allowed to consume
// * and the better it will do in optimally reading the pending entries.
// */
// static final long MAX_PENDING_ENTRIES = Long.getLong("gemfire.MAX_PENDING_ENTRIES", 1000000).longValue();
/**
* Should only be used if this region has entries on disk that are not in memory.
* This currently happens for overflow and for recovery when values are not recovered.
* The first iteration does a normal iteration of the regionEntries.
* But if it finds an entry that is currently only on disk
* it saves it in a list sorted by the location on disk.
* Once the regionEntries iterator has nothing more to iterate
* it starts iterating over, in disk order, the entries on disk.
*/
private class DiskSavyIterator implements Iterator<RegionEntry> {
private boolean usingIt = true;
private Iterator<?> it = entries.regionEntries().iterator();
// iterator for nested ArrayLists
private Iterator<RegionEntry> subIt = null;
//private final ArrayList<DiskPosition> diskList = new ArrayList<DiskPosition>(/*@todo presize based on number of entries only on disk*/);
// value will be either RegionEntry or an ArrayList<RegionEntry>
// private long pendingCount = 0;
private final java.util.TreeMap<DiskPage, Object> diskMap = new java.util.TreeMap<DiskPage, Object>();
// /**
// * used to iterate over the fullest pages at the time we have
// * added MAX_PENDING_ENTRIES to diskMap;
// */
// private Iterator<Map.Entry<DiskPage, Object>> sortedDiskIt;
public DiskSavyIterator() {
}
public boolean hasNext() {
boolean result;
if (this.subIt != null) {
result = this.subIt.hasNext();
if (!result) {
this.subIt = null;
} else {
return result;
}
}
// if (this.sortedDiskIt != null) {
// result = this.sortedDiskIt.hasNext();
// if (!result) {
// this.sortedDiskIt = null;
// } else {
// return result;
// }
// }
result = this.it.hasNext();
if (this.usingIt && !result) {
this.usingIt = false;
// long start = System.currentTimeMillis();
// Collections.sort(this.diskList);
// long end = System.currentTimeMillis();
this.it = this.diskMap.values().iterator();
result = this.it.hasNext();
}
return result;
}
public RegionEntry next() {
for (;;) {
if (this.subIt != null) {
return this.subIt.next();
// } else if (this.sortedDiskIt != null) {
// Map.Entry<DiskPage, Object> me = this.sortedDiskIt.next();
// // remove the page from the diskMap.
// this.diskMap.remove(me.getKey());
// Object v = me.getValue();
// int size = 1;
// if (v instanceof ArrayList) {
// ArrayList al = (ArrayList)v;
// size = al.size();
// // set up the iterator to start returning the entries on that page
// this.subIt = al.iterator();
// v = this.subIt.next();
// }
// // decrement pendingCount by the number of entries on the page
// this.pendingCount -= size;
// // return the first region entry on this page
// return v;
}
if (this.usingIt) {
RegionEntry re = (RegionEntry)this.it.next();
DiskPosition dp = new DiskPosition();
if (re.isOverflowedToDisk(DistributedRegion.this, dp)) {
// add dp to sorted list
DiskPage dPage = new DiskPage(dp);
Object v = this.diskMap.get(dPage);
if (v == null) {
this.diskMap.put(dPage, re);
} else if (v instanceof ArrayList) {
ArrayList al = (ArrayList)v;
al.add(re);
} else {
ArrayList al = new ArrayList();
al.add(v);
al.add(re);
this.diskMap.put(dPage, al);
}
if (!hasNext()) {
assert false; // must be true
}
// this.pendingCount++;
// if (this.usingIt && this.pendingCount >= MAX_PENDING_ENTRIES) {
// // find the pages that have the most entries
// int largestPage = 1;
// ArrayList<Map.Entry<DiskPage, Object>> largestPages
// = new ArrayList<Map.Entry<DiskPage, Object>>();
// for (Map.Entry<DiskPage, Object> me: this.diskMap.entrySet()) {
// int meSize = 1;
// if (me.getValue() instanceof ArrayList) {
// meSize = ((ArrayList)me.getValue()).size();
// }
// if (meSize > largestPage) {
// largestPage = meSize;
// largestPages.clear(); // throw away smaller pages
// largestPages.add(me);
// } else if (meSize == largestPage) {
// largestPages.add(me);
// } else {
// // ignore this page
// }
// }
// Collections.sort(largestPages, new Comparator
// <Map.Entry<DiskPage, Object>>() {
// /**
// * Note: this comparator imposes orderings that are inconsistent
// * with equals.
// */
// public int compare(Map.Entry<DiskPage, Object> o1, Map.Entry<DiskPage, Object> o2) {
// return o1.getKey().compareTo(o2.getKey());
// }
// });
// this.sortedDiskIt = largestPages.iterator();
// // loop around and fetch first value from sortedDiskIt
// }
} else {
return re;
}
} else {
Object v = this.it.next();
if (v instanceof ArrayList) {
ArrayList al = (ArrayList)v;
this.subIt = al.iterator();
return this.subIt.next();
} else {
return (RegionEntry) v;
}
}
}
}
public void remove() {
throw new UnsupportedOperationException();
}
}
public static class DiskPosition implements Comparable<DiskPosition> {
private long oplogId;
private long offset;
DiskPosition() {
}
void setPosition(long oplogId, long offset) {
this.oplogId = oplogId;
this.offset = offset;
}
@Override
public int hashCode() {
return Long.valueOf(this.oplogId ^ this.offset).hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof DiskPosition) {
DiskPosition other = (DiskPosition)o;
return this.oplogId == other.oplogId && this.offset == other.offset;
} else {
return false;
}
}
public int compareTo(DiskPosition o) {
int result = Long.signum(this.oplogId - o.oplogId);
if (result == 0) {
result = Long.signum(this.offset - o.offset);
}
return result;
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("<").append(this.oplogId).append(":").append(this.offset).append(">");
return sb.toString();
}
}
static class DiskPage extends DiskPosition {
static final long DISK_PAGE_SIZE = Long.getLong("gemfire.DISK_PAGE_SIZE", 8 * 1024L).longValue();
DiskPage(DiskPosition dp) {
this.setPosition(dp.oplogId, dp.offset / DISK_PAGE_SIZE);
}
}
/**
* Returns the lock lease value to use for DistributedLock and
* RegionDistributedLock. -1 is supported as non-expiring lock.
*/
protected long getLockLeaseForLock()
{
if (getCache().getLockLease() == -1) {
return -1;
}
return getCache().getLockLease() * 1000;
}
/**
* Returns the lock timeout value to use for DistributedLock and
* RegionDistributedLock. -1 is supported as a lock that never times out.
*/
protected long getLockTimeoutForLock(long time, TimeUnit unit)
{
if (time == -1) {
return -1;
}
return TimeUnit.MILLISECONDS.convert(time, unit);
}
/** ******************* DistributedLock ************************************* */
private class DistributedLock implements Lock
{
private final Object key;
public DistributedLock(Object key) {
this.key = key;
}
public void lock()
{
try {
boolean locked = basicTryLock(-1, TimeUnit.MILLISECONDS, false);
if (!locked) {
lockCheckReadiness();
}
Assert.assertTrue(locked, "Failed to acquire DistributedLock");
}
catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
lockCheckReadiness();
Assert.assertTrue(false, "Failed to acquire DistributedLock");
}
}
public void lockInterruptibly() throws InterruptedException
{
try {
boolean locked = basicTryLock(-1, TimeUnit.MILLISECONDS, true);
if (!locked) {
lockCheckReadiness();
}
Assert.assertTrue(locked, "Failed to acquire DistributedLock");
}
catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
}
}
public boolean tryLock()
{
try {
ReplyProcessor21.forceSevereAlertProcessing();
return getLockService().lock(this.key, 0, getLockLeaseForLock());
}
catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
}
finally {
ReplyProcessor21.unforceSevereAlertProcessing();
}
}
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
return basicTryLock(time, unit, true);
}
private boolean basicTryLock(long time, TimeUnit unit, boolean interruptible)
throws InterruptedException {
// if (Thread.interrupted()) throw new InterruptedException(); not necessary lockInterruptibly does this
final DM dm = getDistributionManager();
long start = System.currentTimeMillis();
long timeoutMS = getLockTimeoutForLock(time, unit);
long end;
if (timeoutMS < 0) {
timeoutMS = Long.MAX_VALUE;
end = Long.MAX_VALUE;
}
else {
end = start + timeoutMS;
}
long ackSAThreshold = getSystem().getConfig().getAckSevereAlertThreshold() * 1000;
boolean suspected = false;
boolean severeAlertIssued = false;
DistributedMember lockHolder = null;
long waitInterval;
long ackWaitThreshold;
if (ackSAThreshold > 0) {
ackWaitThreshold = getSystem().getConfig().getAckWaitThreshold() * 1000;
waitInterval = ackWaitThreshold;
}
else {
waitInterval = timeoutMS;
ackWaitThreshold = 0;
}
do {
try {
waitInterval = Math.min(end-System.currentTimeMillis(), waitInterval);
ReplyProcessor21.forceSevereAlertProcessing();
final boolean gotLock;
if (interruptible) {
gotLock = getLockService().lockInterruptibly(this.key,
waitInterval, getLockLeaseForLock());
}
else {
gotLock = getLockService().lock(this.key,
waitInterval, getLockLeaseForLock());
}
if (gotLock) {
return true;
}
if (ackSAThreshold > 0) {
long elapsed = System.currentTimeMillis() - start;
if (elapsed > ackWaitThreshold) {
if (!suspected) {
// start suspect processing on the holder of the lock
suspected = true;
severeAlertIssued = false; // in case this is a new lock holder
waitInterval = ackSAThreshold;
DLockRemoteToken remoteToken =
((DLockService)getLockService()).queryLock(key);
lockHolder = remoteToken.getLessee();
if (lockHolder != null) {
dm.getMembershipManager()
.suspectMember(lockHolder,
"Has not released a global region entry lock in over "
+ ackWaitThreshold / 1000 + " seconds");
}
}
else if (elapsed > ackSAThreshold) {
DLockRemoteToken remoteToken =
((DLockService)getLockService()).queryLock(key);
if (lockHolder != null && remoteToken.getLessee() != null
&& lockHolder.equals(remoteToken.getLessee())) {
if (!severeAlertIssued) {
severeAlertIssued = true;
logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_0_SECONDS_HAVE_ELAPSED_WAITING_FOR_GLOBAL_REGION_ENTRY_LOCK_HELD_BY_1,
new Object[] {Long.valueOf(ackWaitThreshold+ackSAThreshold), lockHolder}));
}
}
else {
// the lock holder has changed
suspected = false;
waitInterval = ackWaitThreshold;
lockHolder = null;
}
}
}
} // ackSAThreshold processing
}
catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
}
finally {
ReplyProcessor21.unforceSevereAlertProcessing();
}
} while (System.currentTimeMillis() < end);
return false;
}
public void unlock()
{
try {
ReplyProcessor21.forceSevereAlertProcessing();
getLockService().unlock(this.key);
if (!DistributedRegion.this.entries.containsKey(key)) {
getLockService().freeResources(key);
}
}
catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
}
finally {
ReplyProcessor21.unforceSevereAlertProcessing();
}
}
public Condition newCondition() {
throw new UnsupportedOperationException(LocalizedStrings.DistributedRegion_NEWCONDITION_UNSUPPORTED.toLocalizedString());
}
}
/////////////////// RegionDistributedLock //////////////////
private class RegionDistributedLock implements Lock
{
public RegionDistributedLock() {
}
public void lock()
{
try {
boolean locked = getLockService().suspendLocking(-1);
Assert.assertTrue(locked, "Failed to acquire RegionDistributedLock");
}
catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
}
}
public void lockInterruptibly() throws InterruptedException
{
// if (Thread.interrupted()) throw new InterruptedException(); not necessary suspendLockingInterruptibly does this
try {
boolean locked = getLockService().suspendLockingInterruptibly(-1);
Assert.assertTrue(locked, "Failed to acquire RegionDistributedLock");
}
catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
}
}
public boolean tryLock()
{
try {
return getLockService().suspendLocking(0);
}
catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
}
}
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException
{
// if (Thread.interrupted()) throw new InterruptedException(); not necessary suspendLockingINterruptibly does this
try {
return getLockService().suspendLockingInterruptibly(
getLockTimeoutForLock(time, unit));
}
catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
}
}
public void unlock()
{
try {
getLockService().resumeLocking();
}
catch (IllegalStateException ex) {
lockCheckReadiness();
throw ex;
}
}
public Condition newCondition() {
throw new UnsupportedOperationException(LocalizedStrings.DistributedRegion_NEWCONDITION_UNSUPPORTED.toLocalizedString());
}
}
// - add in region locking for destroy and invalidate...
/**
* If this region's scope is GLOBAL, get the region distributed lock. The
* sender is responsible for unlocking.
*
* @return the acquired Lock if the region is GLOBAL and not already suspend,
* otherwise null.
*/
Lock getRegionDistributedLockIfGlobal() throws TimeoutException
{
if (getScope().isGlobal()) {
if (isLockingSuspendedByCurrentThread())
return null;
Lock dlock = getRegionDistributedLock();
dlock.lock();
return dlock;
}
return null;
}
/*
* void localDestroyRegion(Object aCallbackArgument) { try { Lock dlock =
* this.getRegionDistributedLockIfGlobal(); try {
* super.localDestroyRegion(aCallbackArgument); } finally { if (dlock != null) {
* dlock.unlock(); } } } catch (TimeoutException e) { throw new
* GemFireCacheException("localDestroyRegion timed out", e); } }
*
* void destroyRegion(Object aCallbackArgument) throws CacheWriterException,
* TimeoutException { Lock dlock = this.getRegionDistributedLockIfGlobal();
* try { super.destroyRegion(aCallbackArgument); } finally { if (dlock !=
* null) { dlock.unlock(); } } }
*
* void invalidateRegion(Object aCallbackArgument) throws TimeoutException {
* Lock dlock = this.getRegionDistributedLockIfGlobal(); try {
* super.invalidateRegion(aCallbackArgument); } finally { if (dlock != null) {
* dlock.unlock(); } } }
*/
/**
* Distribute the PutAllOp.
* This implementation distributes it to peers.
* @since 5.7
*/
@Override
public void postPutAllSend(DistributedPutAllOperation putAllOp, VersionedObjectList successfulPuts) {
if (putAllOp.putAllDataSize > 0) {
putAllOp.distribute();
} else {
if (logger.isDebugEnabled()) {
logger.debug("DR.postPutAll: no data to distribute");
}
}
}
@Override
public void postRemoveAllSend(DistributedRemoveAllOperation op, VersionedObjectList successfulOps) {
if (op.removeAllDataSize > 0) {
op.distribute();
} else {
getCache().getLoggerI18n().fine("DR.postRemoveAll: no data to distribute");
}
}
@Override
public VersionedObjectList basicPutAll(final Map<?, ?> map,
final DistributedPutAllOperation putAllOp, final Map<Object, VersionTag> retryVersions) {
Lock dlock = this.getRegionDistributedLockIfGlobal();
try {
return super.basicPutAll(map, putAllOp, retryVersions);
} finally {
if (dlock != null) {
dlock.unlock();
}
}
}
@Override
public VersionedObjectList basicRemoveAll(final Collection<Object> keys,
final DistributedRemoveAllOperation removeAllOp, final ArrayList<VersionTag> retryVersions) {
Lock dlock = this.getRegionDistributedLockIfGlobal();
try {
return super.basicRemoveAll(keys, removeAllOp, retryVersions);
} finally {
if (dlock != null) {
dlock.unlock();
}
}
}
/** Returns true if any required roles are currently missing */
boolean isMissingRequiredRoles()
{
return this.isMissingRequiredRoles;
}
/**
* Returns the missing required roles after waiting up to the timeout
*
* @throws IllegalStateException
* if region is not configured with required roles
* @throws InterruptedException TODO-javadocs
*/
public Set waitForRequiredRoles(long timeout) throws InterruptedException
{
if (Thread.interrupted()) throw new InterruptedException();
checkReadiness();
if (!getMembershipAttributes().hasRequiredRoles()) {
throw new IllegalStateException(LocalizedStrings.DistributedRegion_REGION_HAS_NOT_BEEN_CONFIGURED_WITH_REQUIRED_ROLES.toLocalizedString());
}
if (!this.isMissingRequiredRoles) { // should we delete this check?
if (logger.isDebugEnabled()) {
logger.debug("No missing required roles to wait for.");
}
return Collections.EMPTY_SET; // early-out: no missing required roles
}
if (timeout != 0) { // if timeout is zero then fall through past waits
if (timeout == -1) { // infinite timeout
while (this.isMissingRequiredRoles) {
checkReadiness();
this.cache.getCancelCriterion().checkCancelInProgress(null); // bail if distribution has stopped
synchronized (this.missingRequiredRoles) {
// one more check while synced
if (this.isMissingRequiredRoles) {
if (logger.isDebugEnabled()) {
logger.debug("About to wait for missing required roles.");
}
// TODO an infinite wait here might be a problem...
this.missingRequiredRoles.wait(); // spurious wakeup ok
}
}
}
}
else { // use the timeout
long endTime = System.currentTimeMillis() + timeout;
while (this.isMissingRequiredRoles) {
checkReadiness();
this.cache.getCancelCriterion().checkCancelInProgress(null); // bail if distribution has stopped
synchronized (this.missingRequiredRoles) {
// one more check while synced
if (this.isMissingRequiredRoles) {
long timeToWait = endTime - System.currentTimeMillis();
if (timeToWait > 0) {
if (logger.isDebugEnabled()) {
logger.debug("About to wait up to {} milliseconds for missing required roles.", timeToWait);
}
this.missingRequiredRoles.wait(timeToWait); // spurious wakeup ok
}
else {
break;
}
}
}
}
}
}
// check readiness again: thread may have been notified at destroy time
checkReadiness();
if (this.isMissingRequiredRoles) {
// sync on missingRequiredRoles to prevent mods to required role status...
synchronized (this.missingRequiredRoles) {
return Collections.unmodifiableSet(new HashSet(
this.missingRequiredRoles));
}
}
else {
return Collections.EMPTY_SET;
}
}
/** Returns true if the role is currently present this region's membership. */
public boolean isRoleInRegionMembership(Role role)
{
checkReadiness();
return basicIsRoleInRegionMembership(role);
}
protected boolean basicIsRoleInRegionMembership(Role role)
{
if (getSystem().getDistributedMember().getRoles().contains(role)) {
// since we are playing the role
return true;
}
Set members = this.distAdvisor.adviseGeneric();
for (Iterator iter = members.iterator(); iter.hasNext();) {
DistributedMember member = (DistributedMember)iter.next();
Set roles = member.getRoles();
if (roles.contains(role)) {
return true;
}
}
return false;
}
@Override
public void remoteRegionInitialized(CacheProfile profile) {
synchronized(this.advisorListener) {
if (this.advisorListener.members == null && hasListener()) {
Object callback = TEST_HOOK_ADD_PROFILE? profile : null;
RegionEventImpl event = new RegionEventImpl(this,
Operation.REGION_CREATE, callback, true, profile.peerMemberId);
dispatchListenerEvent(EnumListenerEvent.AFTER_REMOTE_REGION_CREATE,
event);
}
}
}
@Override
protected void removeSenderFromAdvisor(InternalDistributedMember sender, int serial, boolean regionDestroyed)
{
getDistributionAdvisor().removeIdWithSerial(sender, serial, regionDestroyed);
}
/** doesn't throw RegionDestroyedException, used by CacheDistributionAdvisor */
public DistributionAdvisee getParentAdvisee() {
return (DistributionAdvisee) basicGetParentRegion();
}
/**
* Used to get membership events from our advisor to implement
* RegionMembershipListener invocations.
*
* @since 5.0
*/
protected class AdvisorListener implements MembershipListener
{
private Set members = new HashSet();
protected boolean destroyed = false;
protected synchronized void addMembers(Set newMembers)
{
this.members.addAll(newMembers);
}
protected synchronized Set getInitialMembers()
{
Set initMembers = this.members;
this.members = null;
return initMembers;
}
public void quorumLost(Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {
}
public void memberSuspect(InternalDistributedMember id,
InternalDistributedMember whoSuspected) {
}
/** called when membership listeners are added after region creation */
protected synchronized void initRMLWrappers() {
Set membersWithThisRegion = DistributedRegion.this.distAdvisor.adviseGeneric();
initPostCreateRegionMembershipListeners(membersWithThisRegion);
}
public synchronized void memberJoined(InternalDistributedMember id)
{
if (this.destroyed) {
return;
}
if (this.members != null) {
this.members.add(id);
}
// bug #44684 - do not notify listener of create until remote member is initialized
// if (this.members == null && hasListener()) {
// RegionEventImpl event = new RegionEventImpl(DistributedRegion.this,
// Operation.REGION_CREATE, null, true, id);
// dispatchListenerEvent(EnumListenerEvent.AFTER_REMOTE_REGION_CREATE,
// event);
// }
if (getMembershipAttributes().hasRequiredRoles()) {
// newlyAcquiredRoles is used for intersection and RoleEvent
Set newlyAcquiredRoles = Collections.EMPTY_SET;
synchronized (missingRequiredRoles) {
if (isMissingRequiredRoles) {
Set roles = id.getRoles();
newlyAcquiredRoles = new HashSet(missingRequiredRoles);
newlyAcquiredRoles.retainAll(roles); // find the intersection
if (!newlyAcquiredRoles.isEmpty()) {
if (DistributedRegion.this.rmq != null) {
Iterator it = newlyAcquiredRoles.iterator();
final DM dm = getDistributionManager();
while (it.hasNext()) {
getCache().getCancelCriterion().checkCancelInProgress(null);
final Role role = (Role)it.next();
try {
// do this in the waiting pool to make it async
// @todo darrel/klund: add a single serial executor for
// queue flush
dm.getWaitingThreadPool().execute(new Runnable() {
public void run()
{
DistributedRegion.this.rmq.roleReady(role);
}
});
break;
}
catch (RejectedExecutionException ex) {
throw ex;
}
} // while
}
missingRequiredRoles.removeAll(newlyAcquiredRoles);
if (this.members == null && missingRequiredRoles.isEmpty()) {
isMissingRequiredRoles = false;
getCachePerfStats().incReliableRegionsMissing(-1);
if (getMembershipAttributes().getLossAction().isAllAccess())
getCachePerfStats().incReliableRegionsMissingFullAccess(-1); // rahul
else if (getMembershipAttributes().getLossAction()
.isLimitedAccess())
getCachePerfStats()
.incReliableRegionsMissingLimitedAccess(-1);
else if (getMembershipAttributes().getLossAction().isNoAccess())
getCachePerfStats().incReliableRegionsMissingNoAccess(-1);
boolean async = resumeReliability(id, newlyAcquiredRoles);
if (async) {
this.destroyed = true;
}
}
}
}
if (!this.destroyed) {
// any number of threads may be waiting on missingRequiredRoles
missingRequiredRoles.notifyAll();
}
}
if (!this.destroyed && this.members == null && hasListener()) {
if (!newlyAcquiredRoles.isEmpty()) {
// fire afterRoleGain event
RoleEventImpl relEvent = new RoleEventImpl(DistributedRegion.this,
Operation.REGION_CREATE, null, true, id, newlyAcquiredRoles);
dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_GAIN, relEvent);
}
}
}
}
public synchronized void memberDeparted(InternalDistributedMember id,
boolean crashed)
{
if (this.destroyed) {
return;
}
if (this.members != null) {
this.members.remove(id);
}
if (this.members == null && hasListener()) {
RegionEventImpl event = new RegionEventImpl(DistributedRegion.this,
Operation.REGION_CLOSE, null, true, id);
if (crashed) {
dispatchListenerEvent(EnumListenerEvent.AFTER_REMOTE_REGION_CRASH,
event);
}
else {
// @todo darrel: it would be nice to know if what actual op was done
// could be close, local destroy, or destroy (or load snap?)
if (DestroyRegionOperation.isRegionDepartureNotificationOk()) {
dispatchListenerEvent(EnumListenerEvent.AFTER_REMOTE_REGION_DEPARTURE, event);
}
}
}
if (getMembershipAttributes().hasRequiredRoles()) {
Set newlyMissingRoles = Collections.EMPTY_SET;
synchronized (missingRequiredRoles) {
Set roles = id.getRoles();
for (Iterator iter = roles.iterator(); iter.hasNext();) {
Role role = (Role)iter.next();
if (getMembershipAttributes().getRequiredRoles().contains(role)
&& !basicIsRoleInRegionMembership(role)) {
if (newlyMissingRoles == Collections.EMPTY_SET) {
newlyMissingRoles = new HashSet();
}
newlyMissingRoles.add(role);
if (this.members == null && !isMissingRequiredRoles) {
isMissingRequiredRoles = true;
getCachePerfStats().incReliableRegionsMissing(1);
if (getMembershipAttributes().getLossAction().isAllAccess())
getCachePerfStats().incReliableRegionsMissingFullAccess(1); // rahul
else if (getMembershipAttributes().getLossAction()
.isLimitedAccess())
getCachePerfStats().incReliableRegionsMissingLimitedAccess(1);
else if (getMembershipAttributes().getLossAction().isNoAccess())
getCachePerfStats().incReliableRegionsMissingNoAccess(1);
boolean async = lostReliability(id, newlyMissingRoles);
if (async) {
this.destroyed = true;
}
}
}
}
if (!this.destroyed) {
missingRequiredRoles.addAll(newlyMissingRoles);
// any number of threads may be waiting on missingRequiredRoles...
missingRequiredRoles.notifyAll();
}
}
if (!this.destroyed && this.members == null && hasListener()) {
if (!newlyMissingRoles.isEmpty()) {
// fire afterRoleLoss event
RoleEventImpl relEvent = new RoleEventImpl(DistributedRegion.this,
Operation.REGION_CLOSE, null, true, id, newlyMissingRoles);
dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_LOSS, relEvent);
}
}
}
}
}
/**
* Used to bootstrap txState.
* @param key
* @return distributedRegions,
* member with parimary bucket for partitionedRegions
*/
@Override
public DistributedMember getOwnerForKey(KeyInfo key) {
//Asif: fix for sqlfabric bug 42266
assert !this.isInternalRegion() || this.isMetaRegionWithTransactions();
if (!this.getAttributes().getDataPolicy().withStorage()
|| (this.concurrencyChecksEnabled && this.getAttributes()
.getDataPolicy() == DataPolicy.NORMAL)) {
// execute on random replicate
return getRandomReplicate();
}
// if we are non-persistent, forward transactions to
// a persistent member
if (this.concurrencyChecksEnabled && !generateVersionTag) {
return getRandomPersistentReplicate();
}
return super.getOwnerForKey(key);
}
/**
* Execute the provided named function in all locations that contain the given
* keys. So function can be executed on just one fabric node, executed in
* parallel on a subset of nodes in parallel across all the nodes.
*
* @param function
* @param args
* @since 5.8
*/
@Override
public ResultCollector executeFunction(
final DistributedRegionFunctionExecutor execution,
final Function function, final Object args,
final ResultCollector rc, final Set filter,
final ServerToClientFunctionResultSender sender) {
DistributedMember target = getTransactionalNode();
if (target != null) {
if (target.equals(getMyId())) {
return executeLocally(execution, function, args, 0, rc, filter, sender);
}
return executeOnReplicate(execution, function, args, rc, filter, target);
} else if (this.getAttributes().getDataPolicy().withReplication()
|| this.getAttributes().getDataPolicy().withPreloaded()) {
// execute locally
final Set<InternalDistributedMember> singleMember = Collections
.singleton(getMyId());
execution.validateExecution(function, singleMember);
execution.setExecutionNodes(singleMember);
return executeLocally(execution, function, args, 0, rc, filter, sender);
} else {
// select a random replicate
target = getRandomReplicate();
if (target == null) {
throw new FunctionException(LocalizedStrings
.DistributedRegion_NO_REPLICATED_REGION_FOUND_FOR_EXECUTING_FUNCTION_0
.toLocalizedString(function.getId()));
}
}
final LocalResultCollector<?, ?> localRC = execution
.getLocalResultCollector(function, rc);
return executeOnReplicate(execution, function, args, localRC, filter, target);
}
private ResultCollector executeOnReplicate(
final DistributedRegionFunctionExecutor execution,
final Function function, final Object args, ResultCollector rc,
final Set filter, final DistributedMember target) {
final Set singleMember = Collections.singleton(target);
execution.validateExecution(function, singleMember);
execution.setExecutionNodes(singleMember);
HashMap<InternalDistributedMember, Object> memberArgs = new HashMap<InternalDistributedMember, Object>();
memberArgs.put((InternalDistributedMember)target, execution.getArgumentsForMember(target.getId()));
ResultSender resultSender = new DistributedRegionFunctionResultSender(null, rc,
function, execution.getServerResultSender());
DistributedRegionFunctionResultWaiter waiter = new DistributedRegionFunctionResultWaiter(
this.getSystem(), this.getFullPath(), rc, function, filter,
Collections.singleton(target), memberArgs, resultSender);
rc = waiter.getFunctionResultFrom(Collections.singleton(target),
function, execution);
return rc;
}
/**
* @return the node which a transaction is already is progress, null otherwise
*/
private DistributedMember getTransactionalNode() {
if (cache.getTxManager().getTXState() != null) {
return cache.getTxManager().getTXState().getTarget();
}
return null;
}
/**
* Implementation of {@link ProfileVisitor} that selects a random replicated
* member from the available ones for this region.
*/
static final class GetRandomReplicate implements
ProfileVisitor<DistributedMember> {
private boolean onlyPersistent = false;
InternalDistributedMember member = null;
private int randIndex = -1;
public GetRandomReplicate() {
}
public GetRandomReplicate(boolean onlyPersistent) {
this.onlyPersistent = onlyPersistent;
}
public boolean visit(DistributionAdvisor advisor, Profile profile,
int profileIndex, int numProfiles, DistributedMember member) {
final CacheProfile cp = (CacheProfile)profile;
if (this.randIndex < 0) {
this.randIndex = PartitionedRegion.rand.nextInt(numProfiles);
}
if (cp.dataPolicy.withReplication() && cp.regionInitialized
&& !cp.memberUnInitialized) {
if (onlyPersistent && !cp.dataPolicy.withPersistence()) {
return true;
}
// store the last replicated member in any case since in the worst case
// there may be no replicated node after "randIndex" in which case the
// last visited member will be used
this.member = cp.getDistributedMember();
if (profileIndex >= this.randIndex) {
return false;
}
}
return true;
}
}
/**
* @return a random replicate, null if there are none
*/
public InternalDistributedMember getRandomReplicate() {
/* [sumedh] The old code causes creation of a unnecessary HashSet
* and population with all replicates (which may be large), then
* copy into an array and then selection of a random one from that.
* The new approach uses a much more efficient visitor instead.
Set replicates = this.getCacheDistributionAdvisor().adviseReplicates();
if (replicates.isEmpty()) {
return null;
}
return (InternalDistributedMember)(replicates
.toArray()[new Random().nextInt(replicates.size())]);
*/
final GetRandomReplicate getReplicate = new GetRandomReplicate();
this.getCacheDistributionAdvisor().accept(getReplicate, null);
return getReplicate.member;
}
/**
* @return a random persistent replicate, null if there is none
*/
public InternalDistributedMember getRandomPersistentReplicate() {
final GetRandomReplicate getPersistentReplicate = new GetRandomReplicate(true);
this.getCacheDistributionAdvisor().accept(getPersistentReplicate, null);
return getPersistentReplicate.member;
}
void executeOnRegion(DistributedRegionFunctionStreamingMessage msg,
final Function function, final Object args, int prid,
final Set filter, boolean isReExecute) throws IOException {
final DM dm = getDistributionManager();
ResultSender resultSender = new DistributedRegionFunctionResultSender(dm, msg, function);
final RegionFunctionContextImpl context = new RegionFunctionContextImpl(
function.getId(), this, args, filter, null, null, resultSender,
isReExecute);
FunctionStats stats = FunctionStats.getFunctionStats(function.getId(), dm.getSystem());
try {
long start = stats.startTime();
stats.startFunctionExecution(function.hasResult());
function.execute(context);
stats.endFunctionExecution(start,function.hasResult());
}
catch (FunctionException functionException) {
if (logger.isDebugEnabled()) {
logger.debug("FunctionException occured on remote node while executing Function: {}", function.getId(), functionException);
}
stats.endFunctionExecutionWithException(function.hasResult());
throw functionException;
}
catch (CacheClosedException cacheClosedexception) {
if (logger.isDebugEnabled()) {
logger.debug("CacheClosedException occured on remote node while executing Function: {}", function.getId(), cacheClosedexception);
}
throw cacheClosedexception;
}
catch (Exception exception) {
if (logger.isDebugEnabled()) {
logger.debug("Exception occured on remote node while executing Function: {}", function.getId(), exception);
}
stats.endFunctionExecutionWithException(function.hasResult());
throw new FunctionException(exception);
}
}
ResultCollector executeLocally(
final DistributedRegionFunctionExecutor execution,
final Function function, final Object args, int prid,
final ResultCollector rc, final Set filter,
final ServerToClientFunctionResultSender sender) {
final LocalResultCollector<?, ?> localRC = execution
.getLocalResultCollector(function, rc);
final DM dm = getDistributionManager();
final DistributedRegionFunctionResultSender resultSender = new DistributedRegionFunctionResultSender(
dm, localRC, function, sender);
final RegionFunctionContextImpl context = new RegionFunctionContextImpl(
function.getId(), DistributedRegion.this, args, filter, null, null,
resultSender, execution.isReExecute());
execution.executeFunctionOnLocalNode(function, context, resultSender, dm, isTX());
return localRC;
}
@Override
protected void setMemoryThresholdFlag(MemoryEvent event) {
Set<InternalDistributedMember> others = getCacheDistributionAdvisor().adviseGeneric();
if (event.isLocal() || others.contains(event.getMember())) {
if (event.getState().isCritical()
&& !event.getPreviousState().isCritical()
&& (event.getType() == ResourceType.HEAP_MEMORY || (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
setMemoryThresholdReachedCounterTrue(event.getMember());
} else if (!event.getState().isCritical()
&& event.getPreviousState().isCritical()
&& (event.getType() == ResourceType.HEAP_MEMORY || (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
removeMemberFromCriticalList(event.getMember());
}
}
}
@Override
public void removeMemberFromCriticalList(DistributedMember member) {
if (logger.isDebugEnabled()) {
logger.debug("DR: removing member {} from critical member list", member);
}
synchronized(this.memoryThresholdReachedMembers) {
this.memoryThresholdReachedMembers.remove(member);
if (this.memoryThresholdReachedMembers.size() == 0) {
memoryThresholdReached.set(false);
}
}
}
@Override
public Set<DistributedMember> getMemoryThresholdReachedMembers() {
synchronized (this.memoryThresholdReachedMembers) {
return Collections.unmodifiableSet(this.memoryThresholdReachedMembers);
}
}
@Override
public void initialCriticalMembers(boolean localMemoryIsCritical,
Set<InternalDistributedMember> critialMembers) {
Set<InternalDistributedMember> others = getCacheDistributionAdvisor().adviseGeneric();
for (InternalDistributedMember idm: critialMembers) {
if (others.contains(idm)) {
setMemoryThresholdReachedCounterTrue(idm);
}
}
}
/**
* @param idm member whose threshold has been exceeded
*/
private void setMemoryThresholdReachedCounterTrue(final DistributedMember idm) {
synchronized(this.memoryThresholdReachedMembers) {
this.memoryThresholdReachedMembers.add(idm);
if (this.memoryThresholdReachedMembers.size() > 0) {
memoryThresholdReached.set(true);
}
}
}
/**
* Fetch Version for the given key from a remote replicate member.
* @param key
* @throws EntryNotFoundException if the entry is not found on replicate member
* @return VersionTag for the key
*/
protected VersionTag fetchRemoteVersionTag(Object key) {
VersionTag tag = null;
assert this.dataPolicy != DataPolicy.REPLICATE;
TransactionId txId = cache.getCacheTransactionManager().suspend();
try {
boolean retry = true;
InternalDistributedMember member = getRandomReplicate();
while (retry) {
try {
if (member == null) {
break;
}
FetchVersionResponse response = RemoteFetchVersionMessage.send(member, this, key);
tag = response.waitForResponse();
retry = false;
} catch (RemoteOperationException e) {
member = getRandomReplicate();
if (member != null) {
if (logger.isDebugEnabled()) {
logger.debug("Retrying RemoteFetchVersionMessage on member:{}", member);
}
}
}
}
} finally {
if (txId != null) {
cache.getCacheTransactionManager().resume(txId);
}
}
return tag;
}
/**
* Test hook for bug 48578. Returns true if it sees a net loader.
* Returns false if it does not have one.
*/
public boolean hasNetLoader() {
return this.hasNetLoader(getCacheDistributionAdvisor());
}
}