blob: 59c12ba25f1eee0a7731576700fe297eeaeb488b [file] [log] [blame]
/*
* =========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
* =========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.cache.EntryDestroyedException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionService;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext;
import com.gemstone.gemfire.internal.cache.partitioned.PRLocallyDestroyedException;
import com.gemstone.gemfire.internal.cache.persistence.PRPersistentConfig;
import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
/**
* An utility class to retrieve colocated regions in a colocation hierarchy in
* various scenarios
*
* @author Yogesh Mahajan
* @author Kishor Bachhav
*
* @since 6.0
*/
public class ColocationHelper {
/** Logging mechanism for debugging */
private static final Logger logger = LogService.getLogger();
/**
* An utility method to retrieve colocated region name of a given partitioned
* region without waiting on initialize
*
* @param partitionedRegion
* @return colocated PartitionedRegion
* @since cheetah
*/
public static PartitionedRegion getColocatedRegionName(
final PartitionedRegion partitionedRegion) {
Assert.assertTrue(partitionedRegion != null); // precondition1
String colocatedWith = partitionedRegion.getPartitionAttributes().getColocatedWith();
if (colocatedWith == null) {
// the region is not colocated with any region
return null;
}
PartitionedRegion colocatedPR = partitionedRegion.getColocatedWithRegion();
if (colocatedPR != null && !colocatedPR.isLocallyDestroyed
&& !colocatedPR.isDestroyed()) {
return colocatedPR;
}
Region prRoot = PartitionedRegionHelper.getPRRoot(partitionedRegion
.getCache());
PartitionRegionConfig prConf = (PartitionRegionConfig)prRoot
.get(getRegionIdentifier(colocatedWith));
int prID = -1;
try {
if (prConf == null) {
colocatedPR = getColocatedPR(partitionedRegion, colocatedWith);
}
else {
prID = prConf.getPRId();
colocatedPR = PartitionedRegion.getPRFromId(prID);
if (colocatedPR == null && prID > 0) {
// colocatedPR might have not called registerPartitionedRegion() yet, but since prID is valid,
// we are able to get colocatedPR and do colocatedPR.waitOnBucketMetadataInitialization()
colocatedPR = getColocatedPR(partitionedRegion, colocatedWith);
}
}
}
catch (PRLocallyDestroyedException e) {
if (logger.isDebugEnabled()) {
logger.debug("PRLocallyDestroyedException : Region with prId=" + prID
+ " is locally destroyed on this node", e);
}
}
return colocatedPR;
}
private static PartitionedRegion getColocatedPR(
final PartitionedRegion partitionedRegion, final String colocatedWith) {
logger.info(LocalizedMessage.create(
LocalizedStrings.HOPLOG_0_COLOCATE_WITH_REGION_1_NOT_INITIALIZED_YET,
new Object[] { partitionedRegion.getFullPath(), colocatedWith }));
PartitionedRegion colocatedPR = (PartitionedRegion) partitionedRegion
.getCache().getPartitionedRegion(colocatedWith, false);
assert colocatedPR != null;
return colocatedPR;
}
/** Whether to ignore missing parallel queues on restart
* if they are not attached to the region. See bug 50120. Mutable
* for tests.
*/
public static boolean IGNORE_UNRECOVERED_QUEUE = Boolean.getBoolean("gemfire.IGNORE_UNRECOVERED_QUEUE");
/**
* An utility method to retrieve colocated region of a given partitioned
* region
*
* @param partitionedRegion
* @return colocated PartitionedRegion
* @since 5.8Beta
*/
public static PartitionedRegion getColocatedRegion(
final PartitionedRegion partitionedRegion) {
Assert.assertTrue(partitionedRegion != null); // precondition1
String colocatedWith = partitionedRegion.getPartitionAttributes()
.getColocatedWith();
if (colocatedWith == null) {
// the region is not colocated with any region
return null;
}
Region prRoot = PartitionedRegionHelper.getPRRoot(partitionedRegion
.getCache());
PartitionRegionConfig prConf = (PartitionRegionConfig)prRoot
.get(getRegionIdentifier(colocatedWith));
int prID = prConf.getPRId();
PartitionedRegion colocatedPR = null;
try {
colocatedPR = PartitionedRegion.getPRFromId(prID);
colocatedPR.waitOnBucketMetadataInitialization();
}
catch (PRLocallyDestroyedException e) {
if (logger.isDebugEnabled()) {
logger.debug("PRLocallyDestroyedException : Region with prId={} is locally destroyed on this node", prID, e);
}
}
return colocatedPR;
}
/**
* An utility to make sure that a member contains all of the partitioned
* regions that are colocated with a given region on other members.
* TODO rebalance - this is rather inefficient, and probably all this junk should
* be in the advisor.
*/
public static boolean checkMembersColocation(PartitionedRegion partitionedRegion, InternalDistributedMember member) {
List<PartitionRegionConfig> colocatedRegions = new ArrayList<PartitionRegionConfig>();
List<PartitionRegionConfig> tempcolocatedRegions = new ArrayList<PartitionRegionConfig>();
Region prRoot = PartitionedRegionHelper.getPRRoot(partitionedRegion
.getCache());
PartitionRegionConfig regionConfig =(PartitionRegionConfig) prRoot.get(partitionedRegion.getRegionIdentifier());
//The region was probably concurrently destroyed
if(regionConfig == null) {
return false;
}
tempcolocatedRegions.add(regionConfig);
colocatedRegions.addAll(tempcolocatedRegions);
PartitionRegionConfig prConf = null;
do {
PartitionRegionConfig tempToBeColocatedWith = tempcolocatedRegions
.remove(0);
for (Iterator itr = prRoot.keySet().iterator(); itr.hasNext();) {
String prName = (String)itr.next();
try {
prConf = (PartitionRegionConfig)prRoot.get(prName);
}
catch (EntryDestroyedException ede) {
continue;
}
if (prConf == null) {
// darrel says: I'm seeing an NPE in this code after pr->rem
// merge so I added this check and continue
continue;
}
if (prConf.getColocatedWith() != null) {
if (prConf.getColocatedWith().equals(
tempToBeColocatedWith.getFullPath())
|| ("/" + prConf.getColocatedWith())
.equals(tempToBeColocatedWith.getFullPath())) {
colocatedRegions.add(prConf);
tempcolocatedRegions.add(prConf);
}
}
}
} while (!tempcolocatedRegions.isEmpty());
PartitionRegionConfig tempColocatedWith = regionConfig;
prConf = null;
while (true) {
String colocatedWithRegionName = tempColocatedWith.getColocatedWith();
if (colocatedWithRegionName == null)
break;
else {
try {
prConf = (PartitionRegionConfig)prRoot
.get(getRegionIdentifier(colocatedWithRegionName));
}
catch (EntryDestroyedException ede) {
throw ede;
}
if (prConf == null) {
break;
}
colocatedRegions.add(tempColocatedWith);
tempColocatedWith = prConf;
}
}
//Now check to make sure that all of the colocated regions
//Have this member.
//We don't need a hostname because the equals method doesn't check it.
for(PartitionRegionConfig config: colocatedRegions) {
if(config.isColocationComplete() && !config.containsMember(member)) {
return false;
}
}
//Check to make sure all of the persisted regions that are colocated
//with this region have been created.
if(hasOfflineColocatedChildRegions(partitionedRegion)) {
return false;
}
return true;
}
/**
* Returns true if there are regions that are persisted on this member and
* were previously colocated with the given region, but have not yet been created.
*
* @param region The parent region
* @return true if there are any child regions that are persisted on this
* member, but have not yet been created.
*/
private static boolean hasOfflineColocatedChildRegions(PartitionedRegion region) {
boolean hasOfflineChildren;
int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.ANY_INIT);
try {
GemFireCacheImpl cache = region.getCache();
Collection<DiskStoreImpl> stores = cache.listDiskStores();
//Look through all of the disk stores for offline colocated child regions
for(DiskStoreImpl diskStore: stores) {
//Look at all of the partitioned regions.
for(Map.Entry<String, PRPersistentConfig> entry : diskStore.getAllPRs().entrySet()) {
PRPersistentConfig config = entry.getValue();
String childName = entry.getKey();
//Check to see if they're colocated with this region.
if(region.getFullPath().equals(config.getColocatedWith())) {
PartitionedRegion childRegion = (PartitionedRegion) cache.getRegion(childName);
if(childRegion == null) {
//If the child region is offline, return true
//unless it is a parallel queue that the user has removed.
if(!ignoreUnrecoveredQueue(region, childName)) {
return true;
}
} else {
//Otherwise, look for offline children of that region.
if(hasOfflineColocatedChildRegions(childRegion)) {
return true;
}
}
}
}
}
} finally {
LocalRegion.setThreadInitLevelRequirement(oldLevel);
}
return false;
}
private static boolean ignoreUnrecoveredQueue(PartitionedRegion region, String childName) {
//Hack for #50120 if the childRegion is an async queue, but we
//no longer define the async queue, ignore it.
if(!ParallelGatewaySenderQueue.isParallelQueue(childName)) {
return false;
}
String senderId = ParallelGatewaySenderQueue.getSenderId(childName);
if(!region.getAsyncEventQueueIds().contains(senderId)
&& !region.getParallelGatewaySenderIds().contains(senderId)
&& IGNORE_UNRECOVERED_QUEUE) {
return true;
}
// TODO Auto-generated method stub
return false;
}
/** A utility to check to see if a region has been created on
* all of the VMs that host the regions this region is colocated with.
*/
public static boolean isColocationComplete(PartitionedRegion region) {
Region prRoot = PartitionedRegionHelper.getPRRoot(region
.getCache());
PartitionRegionConfig config = (PartitionRegionConfig) prRoot.get(region.getRegionIdentifier());
//Fix for bug 40075. There is race between this call and the region being concurrently
//destroyed.
if(config == null) {
Assert.assertTrue(region.isDestroyed() || region.isClosed, "Region is not destroyed, but there is no entry in the prRoot for region " + region);
return false;
}
return config.isColocationComplete();
}
/**
* An utility method to retrieve all partitioned regions(excluding self) in a
* colocation chain<br>
* <p>
* For example, shipmentPR is colocated with orderPR and orderPR is colocated
* with customerPR <br>
* <br>
* getAllColocationRegions(customerPR) --> List{orderPR, shipmentPR}<br>
* getAllColocationRegions(orderPR) --> List{customerPR, shipmentPR}<br>
* getAllColocationRegions(shipmentPR) --> List{customerPR, orderPR}<br>
*
* @param partitionedRegion
* @return List of all partitioned regions (excluding self) in a colocated
* chain
* @since 5.8Beta
*/
public static Map<String, PartitionedRegion> getAllColocationRegions(
PartitionedRegion partitionedRegion) {
Map<String, PartitionedRegion> colocatedRegions = new HashMap<String, PartitionedRegion>();
List<PartitionedRegion> colocatedByRegion = partitionedRegion.getColocatedByList();
if (colocatedByRegion.size() != 0) {
List<PartitionedRegion> tempcolocatedRegions = new ArrayList<PartitionedRegion>();
tempcolocatedRegions.addAll(colocatedByRegion);
do {
PartitionedRegion pRegion = tempcolocatedRegions.remove(0);
pRegion.waitOnBucketMetadataInitialization();
colocatedRegions.put(pRegion.getFullPath(), pRegion);
tempcolocatedRegions.addAll(pRegion.getColocatedByList());
} while (!tempcolocatedRegions.isEmpty());
}
PartitionedRegion tempColocatedWith = partitionedRegion;
while (true) {
PartitionedRegion colocatedWithRegion = tempColocatedWith
.getColocatedWithRegion();
if (colocatedWithRegion == null)
break;
else {
colocatedRegions.put(colocatedWithRegion.getFullPath(),
colocatedWithRegion);
tempColocatedWith = colocatedWithRegion;
}
}
return colocatedRegions;
}
/**
* gets local data of colocated regions on a particular data store
*
* @param partitionedRegion
* @return map of region name to local colocated regions
* @since 5.8Beta
*/
public static Map<String, Region> getAllColocatedLocalDataSets(
PartitionedRegion partitionedRegion, InternalRegionFunctionContext context) {
Map<String, PartitionedRegion> colocatedRegions = getAllColocationRegions(partitionedRegion);
Map<String, Region> colocatedLocalRegions = new HashMap<String, Region>();
for (Iterator itr = colocatedRegions.entrySet().iterator(); itr.hasNext();) {
Map.Entry me = (Entry)itr.next();
final Region pr = (Region) me.getValue();
colocatedLocalRegions.put((String)me.getKey(), context.getLocalDataSet(pr));
}
return colocatedLocalRegions;
}
public static Map<String, LocalDataSet> constructAndGetAllColocatedLocalDataSet(
PartitionedRegion region, Set<Integer> bucketSet) {
Map<String, LocalDataSet> colocatedLocalDataSets = new HashMap<String, LocalDataSet>();
if (region.getColocatedWith() == null && (!region.isColocatedBy())) {
colocatedLocalDataSets.put(region.getFullPath(), new LocalDataSet(region, bucketSet));
return colocatedLocalDataSets;
}
Map<String, PartitionedRegion> colocatedRegions = ColocationHelper
.getAllColocationRegions(region);
for (Region colocatedRegion : colocatedRegions.values()) {
colocatedLocalDataSets.put(colocatedRegion.getFullPath(),
new LocalDataSet((PartitionedRegion)colocatedRegion, bucketSet));
}
colocatedLocalDataSets.put(region.getFullPath(), new LocalDataSet(region, bucketSet));
return colocatedLocalDataSets;
}
public static Map<String, LocalDataSet> getColocatedLocalDataSetsForBuckets(PartitionedRegion region, Set<Integer> bucketSet) {
if (region.getColocatedWith() == null && (!region.isColocatedBy())) {
return Collections.emptyMap();
}
Map<String, LocalDataSet> ret = new HashMap<String, LocalDataSet>();
Map<String, PartitionedRegion> colocatedRegions = ColocationHelper
.getAllColocationRegions(region);
for (Region colocatedRegion : colocatedRegions.values()) {
ret.put(colocatedRegion.getFullPath(),
new LocalDataSet((PartitionedRegion)colocatedRegion, bucketSet));
}
return ret;
}
/**
* A utility method to retrieve all child partitioned regions that are
* directly colocated to the specified partitioned region.<br>
* <p>
* For example, shipmentPR is colocated with orderPR and orderPR is colocated
* with customerPR. <br>
* getColocatedChildRegions(customerPR) will return List{orderPR}<br>
* getColocatedChildRegions(orderPR) will return List{shipmentPR}<br>
* getColocatedChildRegions(shipmentPR) will return empty List{}<br>
*
* @param partitionedRegion
* @return list of all child partitioned regions colocated with the region
* @since 5.8Beta
*/
public static List<PartitionedRegion> getColocatedChildRegions(
PartitionedRegion partitionedRegion) {
List<PartitionedRegion> colocatedChildRegions = new ArrayList<PartitionedRegion>();
Region prRoot = PartitionedRegionHelper.getPRRoot(partitionedRegion
.getCache());
PartitionRegionConfig prConf = null;
// final List allPRNamesList = new ArrayList(prRoot.keySet());
Iterator itr = prRoot.keySet().iterator();
while ( itr.hasNext()) {
try {
String prName = (String)itr.next();
/*if (prName == prName) {
continue;
}*/
try {
prConf = (PartitionRegionConfig)prRoot.get(prName);
}
catch (EntryDestroyedException ede) {
continue;
}
if (prConf == null) {
// darrel says: I'm seeing an NPE in this code after pr->rem
// merge so I added this check and continue
continue;
}
int prID = prConf.getPRId();
PartitionedRegion prRegion = PartitionedRegion.getPRFromId(prID);
if (prRegion != null) {
if (prRegion.getColocatedWith() != null) {
if (prRegion.getColocatedWith().equals(
partitionedRegion.getFullPath())
|| ("/" + prRegion.getColocatedWith()).equals(partitionedRegion
.getFullPath())) {
// only regions directly colocatedWith partitionedRegion are
// added to the list...
prRegion.waitOnBucketMetadataInitialization();
colocatedChildRegions.add(prRegion);
}
}
}
}
catch (PRLocallyDestroyedException e) {
if (logger.isDebugEnabled()) {
logger.debug("PRLocallyDestroyedException : Region ={} is locally destroyed on this node", prConf.getPRId(), e);
}
}
catch (RegionDestroyedException e) {
if (logger.isDebugEnabled()) {
logger.debug("RegionDestroyedException : Region ={} is destroyed.", prConf.getPRId(), e);
}
}
}
//Fix for 44484 - Make the list of colocated child regions
//is always in the same order on all nodes.
Collections.sort(colocatedChildRegions, new Comparator<PartitionedRegion>() {
@Override
public int compare(PartitionedRegion o1, PartitionedRegion o2) {
if(o1.isShadowPR() == o2.isShadowPR()) {
return o1.getFullPath().compareTo(o2.getFullPath());
}
if(o1.isShadowPR()) {
return 1;
}
return -1;
}
});
return colocatedChildRegions;
}
//TODO why do we have this method here?
public static Function getFunctionInstance(Serializable function) {
Function functionInstance = null;
if (function instanceof String) {
functionInstance = FunctionService.getFunction((String)function);
Assert.assertTrue(functionInstance != null, "Function " + function
+ " is not registered on this node ");
}
else {
functionInstance = (Function)function;
}
return functionInstance;
}
public static PartitionedRegion getLeaderRegion(PartitionedRegion prRegion) {
PartitionedRegion parentRegion;
while((parentRegion = getColocatedRegion(prRegion)) != null) {
prRegion = parentRegion;
}
return prRegion;
}
// Gemfirexd will skip initialization for PR, so just get region name without waitOnInitialize
public static PartitionedRegion getLeaderRegionName(PartitionedRegion prRegion) {
PartitionedRegion parentRegion;
while((parentRegion = getColocatedRegionName(prRegion)) != null) {
prRegion = parentRegion;
}
return prRegion;
}
private static String getRegionIdentifier(String regionName) {
if (regionName.startsWith("/")) {
return regionName.replace("/", "#");
}
else {
return "#" + regionName.replace("/", "#");
}
}
/**
* Test to see if there are any persistent child regions
* of a partitioned region.
*/
public static boolean hasPersistentChildRegion(
PartitionedRegion region) {
for(PartitionedRegion child : getColocatedChildRegions(region)) {
if(child.getDataPolicy().withPersistence()) {
return true;
}
}
return false;
}
}