blob: e90f27b4d81f85bf015011a5744849bc46bce096 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.PartitionAttributes;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.MessageWithReply;
import com.gemstone.gemfire.distributed.internal.ReplyException;
import com.gemstone.gemfire.distributed.internal.ReplyMessage;
import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.InternalDataSerializer;
import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile;
import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.InitialImageAdvice;
import com.gemstone.gemfire.internal.cache.partitioned.PRLocallyDestroyedException;
import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor;
import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.PartitionProfile;
import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberID;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
/**
* This message processor handles creation and initial exchange of
* com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.Profiles. It
* represents creation of
* a {@link CacheDistributionAdvisee}. Name remains CreateRegion to avoid
* merge conflicts.
* @author Eric Zoerner
*/
public class CreateRegionProcessor implements ProfileExchangeProcessor {
private static final Logger logger = LogService.getLogger();
protected CacheDistributionAdvisee newRegion;
/** Creates a new instance of CreateRegionProcessor */
public CreateRegionProcessor(CacheDistributionAdvisee newRegion) {
this.newRegion = newRegion;
}
/** this method tells other members that the region is being created */
public void initializeRegion() {
InternalDistributedSystem system = this.newRegion.getSystem();
// try 5 times, see CreateRegionMessage#skipDuringInitialization
for (int retry=0; retry<5; retry++) {
Set recps = getRecipients();
if (logger.isDebugEnabled()) {
logger.debug("Creating region {}", this.newRegion);
}
if (recps.isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug("CreateRegionProcessor.initializeRegion, no recipients, msg not sent");
}
this.newRegion.getDistributionAdvisor().setInitialized();
EventTracker tracker = ((LocalRegion)this.newRegion).getEventTracker();
if (tracker != null) {
tracker.setInitialized();
}
return;
}
CreateRegionReplyProcessor replyProc = new CreateRegionReplyProcessor(recps);
boolean useMcast = false; // never use multicast for region meta-level ops (can cause hangs)
CreateRegionMessage msg = getCreateRegionMessage(recps, replyProc, useMcast);
// since PR buckets can be created during cache entry operations, enable
// severe alert processing if we're creating one of them
if (((LocalRegion)newRegion).isUsedForPartitionedRegionBucket()) {
replyProc.enableSevereAlertProcessing();
msg.severeAlertCompatible = true;
}
this.newRegion.getDistributionManager().putOutgoing(msg);
// this was in a while() loop, which is incorrect use of a reply processor.
// Reply procs are deregistered when they return from waitForReplies
try {
// Don't allow a region to be created if the distributed system is
// disconnecting
this.newRegion.getCache().getCancelCriterion().checkCancelInProgress(null);
// This isn't right. We should disable region creation in general, not just
// the remote case here...
// // Similarly, don't allow new regions to be created if the cache is closing
// GemFireCache cache = (GemFireCache)this.newRegion.getCache();
// if (cache.isClosing()) {
// throw new CacheClosedException("Cannot create a region when the cache is closing");
// }
try {
replyProc.waitForRepliesUninterruptibly();
if (!replyProc.needRetry()) {
break;
}
}
catch (ReplyException e) {
Throwable t = e.getCause();
if (t instanceof IllegalStateException) {
// region is incompatible with region in another cache
throw (IllegalStateException)t;
}
e.handleAsUnexpected();
break;
}
} finally {
replyProc.cleanup();
EventTracker tracker = ((LocalRegion)this.newRegion).getEventTracker();
if (tracker != null) {
tracker.setInitialized();
}
if (((LocalRegion)this.newRegion).isUsedForPartitionedRegionBucket()) {
if (logger.isDebugEnabled()) {
logger.debug("initialized bucket event tracker: {}", tracker);
}
}
}
} // while
// tell advisor that it has been initialized since a profile exchange occurred
this.newRegion.getDistributionAdvisor().setInitialized();
}
protected Set getRecipients() {
DistributionAdvisee parent = this.newRegion.getParentAdvisee();
Set recps = null;
if (parent == null) { // root region, all recipients
InternalDistributedSystem system = this.newRegion.getSystem();
recps = system.getDistributionManager().getOtherDistributionManagerIds();
}
else {
// get recipients that have the parent region defined as distributed.
recps = getAdvice();
}
return recps;
}
public InitialImageAdvice getInitialImageAdvice(InitialImageAdvice previousAdvice) {
return newRegion.getCacheDistributionAdvisor().adviseInitialImage(previousAdvice);
}
private Set getAdvice() {
if (this.newRegion instanceof BucketRegion) {
return ((BucketRegion)this.newRegion).getBucketAdvisor().adviseProfileExchange();
} else {
DistributionAdvisee rgn = this.newRegion.getParentAdvisee();
DistributionAdvisor advisor = rgn.getDistributionAdvisor();
return advisor.adviseGeneric();
}
}
protected CreateRegionMessage getCreateRegionMessage(Set recps,
ReplyProcessor21 proc, boolean useMcast) {
CreateRegionMessage msg = new CreateRegionMessage();
msg.regionPath = this.newRegion.getFullPath();
msg.profile = (CacheProfile)this.newRegion.getProfile();
msg.processorId = proc.getProcessorId();
msg.concurrencyChecksEnabled = this.newRegion.getAttributes().getConcurrencyChecksEnabled();
msg.setMulticast(useMcast);
msg.setRecipients(recps);
return msg;
}
public void setOnline(InternalDistributedMember target) {
}
class CreateRegionReplyProcessor extends ReplyProcessor21 {
CreateRegionReplyProcessor(Set members) {
super((InternalDistributedSystem)CreateRegionProcessor.this.newRegion.
getCache().getDistributedSystem(),
members);
}
/**
* guards application of event state to the region so that we deserialize
* and apply event state only once
*/
private Object eventStateLock = new Object();
/** whether event state has been recorded in the region */
private boolean eventStateRecorded = false;
private boolean allMembersSkippedChecks = true;
/**
* true if all members skipped CreateRegionMessage#checkCompatibility(),
* in which case CreateRegionMessage should be retried.
*/
public boolean needRetry() {
return this.allMembersSkippedChecks;
}
@Override
public void process(DistributionMessage msg) {
Assert.assertTrue(msg instanceof CreateRegionReplyMessage,
"CreateRegionProcessor is unable to process message of type " +
msg.getClass());
CreateRegionReplyMessage reply = (CreateRegionReplyMessage)msg;
LocalRegion lr = (LocalRegion)newRegion;
if (logger.isDebugEnabled()) {
logger.debug("CreateRegionProcessor processing {}", msg);
}
try {
if (reply.profile != null) {
if (newRegion instanceof DistributedRegion) {
DistributedRegion dr = (DistributedRegion) newRegion;
if (!dr.getDataPolicy().withPersistence() && reply.profile.isPersistent) {
dr.setGeneratedVersionTag(false);
}
}
if (CreateRegionMessage.isLocalAccessor(newRegion) && reply.profile.isPersistent) {
lr.enableConcurrencyChecks();
}
CacheDistributionAdvisor cda = CreateRegionProcessor.this.newRegion.getCacheDistributionAdvisor();
cda.putProfile(reply.profile);
if (reply.bucketProfiles != null) {
RegionAdvisor ra = (RegionAdvisor)cda;
ra.putBucketRegionProfiles(reply.bucketProfiles);
}
if (reply.eventState != null && lr.hasEventTracker()) {
synchronized(eventStateLock) {
if (!this.eventStateRecorded) {
this.eventStateRecorded = true;
Object eventState = null;
eventState = reply.eventState;
lr.recordEventState(reply.getSender(), (Map)eventState);
}
}
}
reply.eventState = null;
if (lr.isUsedForPartitionedRegionBucket()) {
((BucketRegion)lr).updateEventSeqNum(reply.seqKeyForWan);
}
// Process any delta filter-profile messages received during profile
// exchange.
// The pending messages are queued in the local profile.
FilterProfile remoteFP = reply.profile.filterProfile;
if (remoteFP != null) {
FilterProfile localFP = ((LocalRegion)newRegion).filterProfile;
// localFP can be null and remoteFP not null when upgrading from 7.0.1.14 to 7.0.1.15
if (localFP != null) {
List messages = localFP.getQueuedFilterProfileMsgs(reply.getSender());
// Thread init level is set since region is used during CQ registration.
int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.ANY_INIT);
try {
remoteFP.processQueuedFilterProfileMsgs(messages);
} finally {
LocalRegion.setThreadInitLevelRequirement(oldLevel);
localFP.removeQueuedFilterProfileMsgs(reply.getSender());
}
}
}
}
if(reply.destroyedId != null && newRegion instanceof DistributedRegion) {
DistributedRegion dr = (DistributedRegion) newRegion;
dr.getPersistenceAdvisor().removeMember(reply.destroyedId);
}
if (!reply.skippedCompatibilityChecks) {
allMembersSkippedChecks = false;
}
} finally {
// invoke super.process() even in case of exceptions (bug #41556)
if (logger.isDebugEnabled()) {
logger.debug("CreateRegionProcessor invoking super.process()");
}
super.process(msg);
}
}
/**
* IllegalStateException is an anticipated reply exception. Receiving
* multiple replies with this exception is normal.
*/
@Override
protected boolean logMultipleExceptions() {
return false;
}
}
public static final class CreateRegionMessage extends HighPriorityDistributionMessage
implements MessageWithReply {
public boolean concurrencyChecksEnabled;
protected String regionPath;
protected CacheProfile profile;
protected int processorId;
private transient boolean incompatible = false;
private transient ReplyException replyException;
private transient CacheProfile replyProfile;
private transient ArrayList replyBucketProfiles;
private transient Object eventState;
protected transient boolean severeAlertCompatible;
private transient boolean skippedCompatibilityChecks;
@Override
public int getProcessorId() {
return this.processorId;
}
@Override
public boolean isSevereAlertCompatible() {
return severeAlertCompatible;
}
@Override
public boolean sendViaJGroups() {
return true;
}
@Override
protected void process(DistributionManager dm) {
int oldLevel = // Set thread local flag to allow entrance through initialization Latch
LocalRegion.setThreadInitLevelRequirement(LocalRegion.ANY_INIT);
LocalRegion lclRgn = null;
PersistentMemberID destroyedId = null;
try {
// get the region from the path, but do NOT wait on initialization,
// otherwise we could have a distributed deadlock
GemFireCacheImpl cache = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem());
//Fix for bug 42051 - Discover any regions that are in the process
//of being destroyed
DistributedRegion destroyingRegion = cache.getRegionInDestroy(this.regionPath);
if(destroyingRegion != null) {
destroyedId = destroyingRegion.getPersistentID();
}
lclRgn = (LocalRegion)cache.getRegion(this.regionPath);
if (lclRgn instanceof CacheDistributionAdvisee) {
// bug 37604 - don't return a profile if this is a bucket and the owner
// has been locally destroyed
if (lclRgn.isUsedForPartitionedRegionBucket()) {
if (!((BucketRegion)lclRgn).isPartitionedRegionOpen()) {
if (logger.isDebugEnabled()) {
logger.debug("<Partitioned Region Closed or Locally Destroyed> {}", this);
}
return;
}
}
handleCacheDistributionAdvisee((CacheDistributionAdvisee)lclRgn,true);
}
else {
if (lclRgn == null) {
// check to see if a ProxyBucketRegion (not a true region) exists
handleCacheDistributionAdvisee(
PartitionedRegionHelper.getProxyBucketRegion(cache, this.regionPath, false),
false);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("<lclRgn scope is not distributed. Scope={}> {}", lclRgn.getAttributes().getScope(), this);
}
}
}
}
catch (PRLocallyDestroyedException fre) {
if (logger.isDebugEnabled()) {
logger.debug("<Region Locally Destroyed> {}", this);
}
}
catch (RegionDestroyedException e) {
if (logger.isDebugEnabled()) {
logger.debug("<RegionDestroyed> {}", this);
}
}
catch (CancelException e) {
if (logger.isDebugEnabled()) {
logger.debug("<CancelException> {}", this);
}
}
catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
if (replyException == null) {
replyException = new ReplyException(t);
}
else {
logger.warn(LocalizedMessage.create(LocalizedStrings.CreateRegionProcessor_MORE_THAN_ONE_EXCEPTION_THROWN_IN__0, this), t);
}
}
finally {
LocalRegion.setThreadInitLevelRequirement(oldLevel);
CreateRegionReplyMessage replyMsg = new CreateRegionReplyMessage();
replyMsg.profile = replyProfile;
replyMsg.bucketProfiles = replyBucketProfiles;
replyMsg.eventState = this.eventState;
replyMsg.destroyedId = destroyedId;
replyMsg.setProcessorId(this.processorId);
replyMsg.setRecipient(this.getSender());
replyMsg.skippedCompatibilityChecks = this.skippedCompatibilityChecks;
if (lclRgn != null && lclRgn.isUsedForPartitionedRegionBucket()) {
replyMsg.seqKeyForWan = ((BucketRegion)lclRgn).getEventSeqNum().get();
}
if (replyException != null && !this.incompatible) {
// no need to log the exception if it was caused by compatibility check
if (logger.isDebugEnabled()) {
logger.debug("While processing '{}', got exception, returning to sender", this, replyException);
}
}
replyMsg.setException(replyException);
dm.putOutgoing(replyMsg);
if(lclRgn instanceof PartitionedRegion)
((PartitionedRegion)lclRgn).sendIndexCreationMsg(this.getSender());
}
}
/**
* Attempts to process this message with the specified
* <code>CacheDistributionAdvisee</code>.
* @param cda the CacheDistributionAdvisee to apply this profile to
* @param isRealRegion true if CacheDistributionAdvisee is a real region
*/
private void handleCacheDistributionAdvisee(
CacheDistributionAdvisee cda, boolean isRealRegion) {
if (cda == null) {
// local region or proxy bucket region not found
if (logger.isDebugEnabled()) {
logger.debug("<lclRgn is null> {}", this); // matches old logging
}
return;
}
String errorMsg = null;
if (isRealRegion) {
// only check compatibility if this advisee is a real region
errorMsg = checkCompatibility(cda, this.profile);
}
if (errorMsg != null) {
this.incompatible = true;
if (logger.isDebugEnabled()) {
logger.debug("{} <replyProfile not set because errorMsg={}", this, errorMsg);
}
this.replyException = new ReplyException(new IllegalStateException(errorMsg));
}
else {
if (isRealRegion) { // TODO do we need this if clause??
// if the new member is persistent, turn on concurrency checks
// fixes bug 45208
if (isLocalAccessor(cda) && this.profile.isPersistent) {
// #45934 need to set the generateVersionTag flag
if (cda instanceof DistributedRegion) {
DistributedRegion dr = (DistributedRegion) cda;
if (!dr.getDataPolicy().withPersistence()) {
dr.setGeneratedVersionTag(false);
}
}
assert cda instanceof LocalRegion;
LocalRegion lr = (LocalRegion) cda;
lr.enableConcurrencyChecks();
}
}
// #45934 don't add profile until the attributes are set correctly,
// in particular enableConcurrencyChecks and generateVersionTag
cda.getDistributionAdvisor().putProfile(this.profile);
if (isRealRegion) {
// only exchange profile if this advisee is a real region
this.replyProfile = (CacheProfile)cda.getProfile();
if (cda instanceof PartitionedRegion) {
// partitioned region needs to also answer back all real bucket profiles
PartitionedRegion pr = (PartitionedRegion)cda;
this.replyBucketProfiles = pr.getRegionAdvisor().getBucketRegionProfiles();
} else if (((LocalRegion)cda).isUsedForPartitionedRegionBucket()) {
this.eventState = ((LocalRegion)cda).getEventState();
}
}
}
}
protected String checkCompatibility(CacheDistributionAdvisee rgn,
CacheProfile profile) {
Scope otherScope = rgn.getAttributes().getScope();
String result = null;
// Verify both VMs are gateway-enabled or neither are. Note that since
// this string is sent back to the caller, the 'other' and the 'my'
// below are from the caller's point of view.
final DistributedMember myId = rgn.getDistributionManager().getId();
boolean otherCCEnabled = rgn.getAttributes().getConcurrencyChecksEnabled();
boolean skipCheckForAccessor = skipCheckForAccessor(rgn, profile);
boolean skipConcurrencyChecks = skipChecksForInternalRegion(rgn);
boolean initializing = skipDuringInitialization(rgn);
if (initializing) {
this.skippedCompatibilityChecks = true;
}
if (!initializing && !skipCheckForAccessor && (rgn.getAttributes().getDataPolicy().withPersistence()
!= profile.dataPolicy.withPersistence())) {
// 45186: Do not allow a persistent replicate to be started if a
// non-persistent replicate is running
if (!rgn.getAttributes().getDataPolicy().withPersistence()) {
result = LocalizedStrings.CreateRegionProcessor_CANNOT_CREATE_REGION_0_WITH_PERSISTANCE_TRUE_PERSISTENT_MEMBERS_B4_NON_PERSISTENT
.toLocalizedString(regionPath, myId);
skipConcurrencyChecks = true;
} else {
// make the new member turn on concurrency checks
skipConcurrencyChecks = true;
}
}
if (!initializing && !skipCheckForAccessor && !skipConcurrencyChecks && this.concurrencyChecksEnabled != otherCCEnabled) {
result = LocalizedStrings.CreateRegionProcessor_CANNOT_CREATE_REGION_0_CCENABLED_1_BECAUSE_ANOTHER_CACHE_HAS_THE_SAME_REGION_CCENABLED_2
.toLocalizedString( new Object[] { regionPath,
Boolean.valueOf(this.concurrencyChecksEnabled), myId, Boolean.valueOf(otherCCEnabled) } );
}
Set<String> otherGatewaySenderIds = ((LocalRegion)rgn).getGatewaySenderIds();
Set<String> myGatewaySenderIds = profile.gatewaySenderIds;
if (!otherGatewaySenderIds
.equals(myGatewaySenderIds)) {
if (!rgn.getFullPath().contains(
DynamicRegionFactoryImpl.dynamicRegionListName)) {
result = LocalizedStrings.CreateRegionProcessor_CANNOT_CREATE_REGION_0_WITH_1_GATEWAY_SENDER_IDS_BECAUSE_ANOTHER_CACHE_HAS_THE_SAME_REGION_WITH_2_GATEWAY_SENDER_IDS
.toLocalizedString(new Object[] { this.regionPath,
otherGatewaySenderIds, myGatewaySenderIds });
}
}
Set<String> otherAsynEventQueueIds = ((LocalRegion)rgn).getAsyncEventQueueIds();
Set<String> myAsyncEventQueueIds = profile.asyncEventQueueIds;
if (!isLocalOrRemoteAccessor(rgn, profile) && !otherAsynEventQueueIds
.equals(myAsyncEventQueueIds)) {
result = LocalizedStrings.CreateRegionProcessor_CANNOT_CREATE_REGION_0_WITH_1_ASYNC_EVENT_IDS_BECAUSE_ANOTHER_CACHE_HAS_THE_SAME_REGION_WITH_2_ASYNC_EVENT_IDS
.toLocalizedString(new Object[] { this.regionPath,
otherAsynEventQueueIds, myAsyncEventQueueIds });
}
final PartitionAttributes pa = rgn.getAttributes()
.getPartitionAttributes();
if (pa == null && profile.isPartitioned) {
result = LocalizedStrings.CreateRegionProcessor_CANNOT_CREATE_PARTITIONEDREGION_0_BECAUSE_ANOTHER_CACHE_HAS_THE_SAME_REGION_DEFINED_AS_A_NON_PARTITIONEDREGION.toLocalizedString( this.regionPath, myId );
} else if (pa != null && ! profile.isPartitioned) {
result = LocalizedStrings.CreateRegionProcessor_CANNOT_CREATE_THE_NON_PARTITIONEDREGION_0_BECAUSE_ANOTHER_CACHE_HAS_A_PARTITIONED_REGION_DEFINED_WITH_THE_SAME_NAME.toLocalizedString(this.regionPath, myId);
} else if (profile.scope.isDistributed() && otherScope.isDistributed()) {
// This check is somewhat unnecessary as all Partitioned Regions should have the same scope
// due to the fact that Partitioned Regions do no support scope.
if (profile.scope != otherScope) {
result = LocalizedStrings.CreateRegionProcessor_CANNOT_CREATE_REGION_0_WITH_1_SCOPE_BECAUSE_ANOTHER_CACHE_HAS_SAME_REGION_WITH_2_SCOPE.toLocalizedString( new Object[] {this.regionPath, profile.scope, myId, otherScope});
}
}
final boolean otherIsOffHeap = rgn.getAttributes().getOffHeap();
boolean thisIsRemoteAccessor = false;
if (!rgn.getAttributes().getDataPolicy().withStorage() || (pa != null && pa.getLocalMaxMemory() == 0)) {
thisIsRemoteAccessor = true;
}
if (!isRemoteAccessor(profile) && !thisIsRemoteAccessor && profile.isOffHeap != otherIsOffHeap) {
result = LocalizedStrings.CreateRegionProcessor_CANNOT_CREATE_REGION_0_WITH_OFF_HEAP_EQUALS_1_BECAUSE_ANOTHER_CACHE_2_HAS_SAME_THE_REGION_WITH_OFF_HEAP_EQUALS_3
.toLocalizedString(new Object[] { this.regionPath, profile.isOffHeap, myId, otherIsOffHeap });
}
if (logger.isDebugEnabled()) {
logger.debug("CreateRegionProcessor.checkCompatibility: this={}; other={}; result={}", rgn, profile, result);
}
// if (profile.membershipAttributes != null) {
// // check to see if:
// // 1. we do not have DataPolicy that will take queued msgs
// // 2. the profile has queuing turned on
// // 3. we are playing one of the queued roles
// if (!rgn.getAttributes().getDataPolicy().withQueuedMessages()) {
// if (profile.membershipAttributes.getLossAction().isAllAccessWithQueuing()) {
// Set myRoles = rgn.getSystem().getDistributedMember().getRoles();
// if (!myRoles.isEmpty()) {
// Set intersection = new HashSet(myRoles);
// intersection.retainAll(profile.membershipAttributes.getRequiredRoles());
// if (!intersection.isEmpty()) {
// result = "Cannot create region " + regionPath
// + " with queuing because the region already exists"
// + " with a data-policy " + rgn.getAttributes().getDataPolicy()
// + " that does not allow queued messages with the roles "
// + intersection;
// }
// }
// }
// }
// } else {
// // see if we are queuing on this region
// MembershipAttributes ra = rgn.getMembershipAttributes();
// if (ra != null && ra.hasRequiredRoles()
// && ra.getLossAction().isAllAccessWithQueuing()) {
// // we are queuing so make sure this other guy allows queued messages
// // if he is playing a role we queue for.
// if (!profile.dataPolicy.withQueuedMessages()) {
// Set intersection = new HashSet(ra.getRequiredRoles());
// intersection.retainAll(profile.getDistributedMember().getRoles());
// if (!intersection.isEmpty()) {
// result = "Cannot create region " + regionPath
// + " with a data-policy " + profile.dataPolicy
// + " that does not allow queued messages because the region"
// + " already exists with queuing enabled for roles " + intersection;
// }
// }
// }
// }
return result;
}
/**
* When many members are started concurrently, it is possible that an
* accessor or non-version generating replicate receives CreateRegionMessage
* before it is initialized, thus preventing persistent members from
* starting. We skip compatibilityChecks if the region is not initialized,
* and let other members check compatibility. If all members
* skipCompatabilit checks, then the CreateRegionMessage should be retried.
* fixes #45186
*/
private boolean skipDuringInitialization(CacheDistributionAdvisee rgn) {
boolean skip = false;
if (rgn instanceof LocalRegion) {
LocalRegion lr = (LocalRegion) rgn;
if (!lr.isInitialized()) {
Set recipients = new CreateRegionProcessor(rgn).getRecipients();
recipients.remove(getSender());
if (!recipients.isEmpty()) {
skip = true;
}
}
}
return skip;
}
/**
* For internal regions skip concurrency-checks-enabled checks, since we will
* set it to true after profile exchange if required.
*/
private boolean skipChecksForInternalRegion(CacheDistributionAdvisee rgn) {
boolean skip = false;
if (rgn instanceof LocalRegion) {
LocalRegion lr = (LocalRegion) rgn;
skip = lr.isInternalRegion();
}
return skip;
}
/**
* check for isLocalOrRemoteAccessor(CacheDistributionAdvisee, CacheProfile)
* and check if DistributedRegion does not generate entry versions.
*/
private boolean skipCheckForAccessor(CacheDistributionAdvisee rgn, CacheProfile profile) {
boolean skip = false;
if (rgn instanceof DistributedRegion) {
DistributedRegion dr = (DistributedRegion) rgn;
skip = !dr.getGenerateVersionTag();
}
return skip || isLocalOrRemoteAccessor(rgn, profile);
}
/**
* @return true if profile being exchanged or region is an accessor
* i.e has no storage
*/
protected static boolean isLocalOrRemoteAccessor(CacheDistributionAdvisee region,
CacheProfile profile) {
return isLocalAccessor(region) || isRemoteAccessor(profile);
}
protected static boolean isLocalAccessor(CacheDistributionAdvisee region) {
if (!region.getAttributes().getDataPolicy().withStorage()) {
return true;
}
if (region.getAttributes().getPartitionAttributes() != null
&& region.getAttributes().getPartitionAttributes()
.getLocalMaxMemory() == 0) {
return true;
}
return false;
}
protected static boolean isRemoteAccessor(CacheProfile profile) {
if (!profile.dataPolicy.withStorage()) {
return true;
}
if (profile.isPartitioned) {
PartitionProfile prProfile = (PartitionProfile) profile;
if (prProfile.localMaxMemory == 0) {
return true;
}
}
return false;
}
@Override
public void reset() {
super.reset();
this.regionPath = null;
this.profile = null;
this.processorId = -1;
}
@Override
public void fromData(DataInput in)
throws IOException, ClassNotFoundException {
super.fromData(in);
this.regionPath = DataSerializer.readString(in);
this.profile = (CacheProfile) DataSerializer.readObject(in);
this.processorId = in.readInt();
this.concurrencyChecksEnabled = in.readBoolean();
}
public int getDSFID() {
return CREATE_REGION_MESSAGE;
}
@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
DataSerializer.writeString(this.regionPath, out);
DataSerializer.writeObject(this.profile, out);
out.writeInt(this.processorId);
out.writeBoolean(this.concurrencyChecksEnabled);
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer();
buff.append("CreateRegionMessage (region='");
buff.append(this.regionPath);
buff.append("'; processorId=");
buff.append(this.processorId);
buff.append("; concurrencyChecksEnabled=").append(this.concurrencyChecksEnabled);
buff.append("; profile=");
buff.append(this.profile);
buff.append(")");
return buff.toString();
}
}
public static final class CreateRegionReplyMessage extends ReplyMessage {
protected CacheProfile profile;
protected ArrayList bucketProfiles;
protected Object eventState;
/** Added to fix 42051. If the region is in the middle of being destroyed, return the destroyed profile */
protected PersistentMemberID destroyedId;
protected boolean skippedCompatibilityChecks;
long seqKeyForWan = -1;
@Override
public int getDSFID() {
return CREATE_REGION_REPLY_MESSAGE;
}
@Override
public boolean sendViaJGroups() {
return true;
}
@Override
public void fromData(DataInput in)
throws IOException, ClassNotFoundException {
super.fromData(in);
if (in.readBoolean()) {
// this.profile = new CacheProfile();
// this.profile.fromData(in);
this.profile = (CacheProfile) DataSerializer.readObject(in);
}
int size = in.readInt();
if (size == 0) {
this.bucketProfiles = null;
}
else {
this.bucketProfiles = new ArrayList(size);
for (int i=0; i < size; i++) {
RegionAdvisor.BucketProfileAndId bp =
new RegionAdvisor.BucketProfileAndId();
InternalDataSerializer.invokeFromData(bp, in);
this.bucketProfiles.add(bp);
}
}
if (in.readBoolean()) {
this.eventState = EventStateHelper.fromData(in, false);
}
if(in.readBoolean()) {
this.destroyedId = new PersistentMemberID();
InternalDataSerializer.invokeFromData(this.destroyedId, in);
}
this.skippedCompatibilityChecks = in.readBoolean();
this.seqKeyForWan = in.readLong();
}
@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
out.writeBoolean(this.profile != null);
if (this.profile != null) {
// this.profile.toData(out);
DataSerializer.writeObject(this.profile, out);
}
if (this.bucketProfiles == null) {
out.writeInt(0);
}
else {
int size = this.bucketProfiles.size();
out.writeInt(size);
for (int i = 0; i < size; i++) {
RegionAdvisor.BucketProfileAndId bp = (RegionAdvisor.BucketProfileAndId)
this.bucketProfiles.get(i);
InternalDataSerializer.invokeToData(bp, out);
}
}
if (this.eventState != null) {
out.writeBoolean(true);
//The isHARegion flag is false here because
//we currently only include the event state in the profile
//for bucket regions.
EventStateHelper.toData(out, (Map) eventState, false);
} else {
out.writeBoolean(false);
}
if(this.destroyedId != null) {
out.writeBoolean(true);
InternalDataSerializer.invokeToData(destroyedId, out);
} else {
out.writeBoolean(false);
}
out.writeBoolean(this.skippedCompatibilityChecks);
out.writeLong(seqKeyForWan);
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer();
buff.append("CreateRegionReplyMessage");
buff.append("(sender=").append(getSender());
buff.append("; processorId=");
buff.append(super.processorId);
buff.append("; profile=");
buff.append(this.profile);
if (this.bucketProfiles != null) {
buff.append("; bucketProfiles=");
buff.append(this.bucketProfiles);
}
if (this.eventState != null) {
buff.append("; eventState=<not null>");
}
buff.append("; skippedCompatibilityChecks=");
buff.append(this.skippedCompatibilityChecks);
buff.append("; seqKeyForWan=");
buff.append(this.seqKeyForWan);
if (this.getException() != null) {
buff.append("; with exception {")
.append(getException().getMessage()).append("}");
}
buff.append(")");
return buff.toString();
}
}
}