blob: 965f96ca59da8fa21db6d9743be76496e6fa52f2 [file] [log] [blame]
/*
* =========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
* =========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.EntryDestroyedException;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.EntryOperation;
import com.gemstone.gemfire.cache.FixedPartitionAttributes;
import com.gemstone.gemfire.cache.FixedPartitionResolver;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.PartitionResolver;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionExistsException;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.partition.PartitionNotAvailableException;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.cache.util.CacheWriterAdapter;
import com.gemstone.gemfire.distributed.DistributedLockService;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.MembershipListener;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.cache.partitioned.Bucket;
import com.gemstone.gemfire.internal.cache.partitioned.PRLocallyDestroyedException;
import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
/**
* @author tapshank Created on Dec 21, 2005
*/
public class PartitionedRegionHelper
{
private static final Logger logger = LogService.getLogger();
// ///////////// All the final variable //////////////////
/** 1 MB */
static final long BYTES_PER_MB = 1024 * 1024;
/** Name of allPartitionedRegions Region * */
// static final String PARTITIONED_REGION_CONFIG_NAME = "__Config";
/** Prefix for the bucket2Node Region name defined in the global space. */
// static final String BUCKET_2_NODE_TABLE_PREFIX = "_B2N_";
/**
* The administrative region used for storing Partitioned Region meta data sub
* regions *
*/
public static final String PR_ROOT_REGION_NAME = "__PR";
/** Name of the DistributedLockService that PartitionedRegions used. */
public static final String PARTITION_LOCK_SERVICE_NAME = "__PRLS";
/** This is used to create bucket regions */
static final String BUCKET_REGION_PREFIX = "_B_";
/**
* Time to wait for ownership (ms)
* <p>
* This should not be used normally. Internally, GemFire uses global locks to
* modify shared meta-data and this property controls the delay before giving
* up trying to acquire a global lock
*/
static final String VM_OWNERSHIP_WAIT_TIME_PROPERTY = "gemfire.VM_OWNERSHIP_WAIT_TIME";
/** Wait forever for ownership */
static final long VM_OWNERSHIP_WAIT_TIME_DEFAULT = Long.MAX_VALUE;
static final String MAX_PARTITIONED_REGION_ID = "MAX_PARTITIONED_REGION_ID";
public static final int DEFAULT_WAIT_PER_RETRY_ITERATION = 100; // milliseconds
public static final int DEFAULT_TOTAL_WAIT_RETRY_ITERATION =
60 * 60 * 1000
; // milliseconds
public static final DataPolicy DEFAULT_DATA_POLICY = DataPolicy.PARTITION;
public static final Set ALLOWED_DATA_POLICIES;
static final Object dlockMonitor = new Object();
static {
Set policies = new HashSet();
policies.add(DEFAULT_DATA_POLICY);
policies.add(DataPolicy.PERSISTENT_PARTITION);
policies.add(DataPolicy.HDFS_PARTITION);
policies.add(DataPolicy.HDFS_PERSISTENT_PARTITION);
// policies.add(DataPolicy.NORMAL);
ALLOWED_DATA_POLICIES = Collections.unmodifiableSet(policies);
}
/**
* This function is used for cleaning the config meta data for the failed or closed
* PartitionedRegion node.
*
* @param failedNode
* The failed PartitionedRegion Node
* @param regionIdentifier
* The PartitionedRegion for which the cleanup is required
* @param cache
* GemFire cache.
*/
static void removeGlobalMetadataForFailedNode(Node failedNode,
String regionIdentifier, GemFireCacheImpl cache){
removeGlobalMetadataForFailedNode(failedNode, regionIdentifier, cache, true);
}
/**
* This function is used for cleaning the config meta data for the failed or closed
* PartitionedRegion node.
*
* @param failedNode
* The failed PartitionedRegion Node
* @param regionIdentifier
* The PartitionedRegion for which the cleanup is required
* @param cache
* GemFire cache.
* @param lock
* True if this removal should acquire and release the RegionLock
*/
static void removeGlobalMetadataForFailedNode(Node failedNode,
String regionIdentifier, GemFireCacheImpl cache, final boolean lock)
{
Region root = PartitionedRegionHelper.getPRRoot(cache, false);
if (root == null) {
return; // no partitioned region info to clean up
}
// Region allPartitionedRegions = PartitionedRegionHelper.getPRConfigRegion(
// root, cache);
PartitionRegionConfig prConfig = (PartitionRegionConfig)root
.get(regionIdentifier);
if (null == prConfig || !prConfig.containsNode(failedNode)) {
return;
}
final PartitionedRegion.RegionLock rl = PartitionedRegion.getRegionLock(regionIdentifier, cache);
try {
if (lock) {
rl.lock();
// if (!rl.lock()) {
// return;
// }
}
prConfig = (PartitionRegionConfig)root.get(regionIdentifier);
if ( prConfig != null && prConfig.containsNode(failedNode) ) {
if(logger.isDebugEnabled()) {
logger.debug("Cleaning up config for pr {} node {}", regionIdentifier, failedNode);
}
if ((prConfig.getNumberOfNodes() - 1) <= 0) {
if(logger.isDebugEnabled()) {
logger.debug("No nodes left but failed node {} destroying entry {} nodes {}",
failedNode, regionIdentifier, prConfig.getNodes());
}
try {
root.destroy(regionIdentifier);
} catch (EntryNotFoundException e) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.PartitionedRegionHelper_GOT_ENTRYNOTFOUNDEXCEPTION_IN_DESTROY_OP_FOR_ALLPRREGION_KEY_0,
regionIdentifier), e);
}
}
else {
prConfig.removeNode(failedNode);
if(prConfig.getNumberOfNodes() == 0) {
root.destroy(regionIdentifier);
} else {
// We can't go backwards, or we potentially lose data
root.put(regionIdentifier, prConfig);
}
}
}
}
finally {
if (lock) {
rl.unlock();
}
}
}
/**
* Return a region that is the root for all Partitioned Region metadata on this
* node
*/
public static LocalRegion getPRRoot(final Cache cache) {
return getPRRoot(cache, true);
}
/**
* Return a region that is the root for all PartitionedRegion meta data on
* this Node. The main administrative Regions contained within are
* <code>allPartitionedRegion</code> (Scope DISTRIBUTED_ACK) and
* <code>bucket2Node</code> (Scope DISTRIBUTED_ACK) and dataStore regions.
*
* @return a GLOBLAL scoped root region used for PartitionedRegion
* administration
*/
public static LocalRegion getPRRoot(final Cache cache, boolean createIfAbsent)
{
GemFireCacheImpl gemCache = (GemFireCacheImpl) cache;
DistributedRegion root = (DistributedRegion) gemCache.getRegion(PR_ROOT_REGION_NAME, true);
if (root == null) {
if (!createIfAbsent) {
return null;
}
if(logger.isDebugEnabled()) {
logger.debug("Creating root Partitioned Admin Region {}", PartitionedRegionHelper.PR_ROOT_REGION_NAME);
}
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.addCacheListener(new FixedPartitionAttributesListener());
if (Boolean.getBoolean("gemfire.PRDebug")) {
factory.addCacheListener( new CacheListenerAdapter() {
@Override
public void afterCreate(EntryEvent event)
{
if (logger.isDebugEnabled()) {
logger.debug("Create Event for allPR: key = {} oldVal = {} newVal = {} Op = {} origin = {} isNetSearch = {}",
event.getKey(), event.getOldValue(), event.getNewValue(), event.getOperation(), event.getDistributedMember(),
event.getOperation().isNetSearch());
}
}
@Override
public void afterUpdate(EntryEvent event)
{
if (logger.isDebugEnabled()) {
logger.debug("Update Event for allPR: key = {} oldVal = {} newVal = {} Op = {} origin = {} isNetSearch = {}",
event.getKey(), event.getOldValue(), event.getNewValue(), event.getOperation(), event.getDistributedMember(),
event.getOperation().isNetSearch());
}
}
@Override
public void afterDestroy(EntryEvent event)
{
if (logger.isDebugEnabled()) {
logger.debug("Destroy Event for allPR: key = {} oldVal = {} newVal = {} Op = {} origin = {} isNetSearch = {}",
event.getKey(), event.getOldValue(), event.getNewValue(), event.getOperation(), event.getDistributedMember(),
event.getOperation().isNetSearch());
}
}
});
factory.setCacheWriter(new CacheWriterAdapter() {
@Override
public void beforeUpdate(EntryEvent event) throws CacheWriterException
{
// the prConfig node list must advance (otherwise meta data becomes out of sync)
final PartitionRegionConfig newConf = (PartitionRegionConfig) event.getNewValue();
final PartitionRegionConfig oldConf = (PartitionRegionConfig) event.getOldValue();
if (newConf != oldConf &&
! newConf.isGreaterNodeListVersion(oldConf) ) {
throw new CacheWriterException(LocalizedStrings.PartitionedRegionHelper_NEW_PARTITIONEDREGIONCONFIG_0_DOES_NOT_HAVE_NEWER_VERSION_THAN_PREVIOUS_1.toLocalizedString(new Object[] {newConf, oldConf}));
}
}
});
}
RegionAttributes ra = factory.create();
// Create anonymous stats holder for Partitioned Region meta data
final HasCachePerfStats prMetaStatsHolder = new HasCachePerfStats() {
public CachePerfStats getCachePerfStats()
{
return new CachePerfStats(cache.getDistributedSystem(), "partitionMetaData");
}
};
try {
root = (DistributedRegion) gemCache.createVMRegion(PR_ROOT_REGION_NAME, ra,
new InternalRegionArguments()
.setIsUsedForPartitionedRegionAdmin(true)
.setCachePerfStatsHolder(prMetaStatsHolder));
root.getDistributionAdvisor().addMembershipListener(new MemberFailureListener());
}
catch (RegionExistsException silly) {
// we avoid this before hand, but yet we have to catch it
root = (DistributedRegion) gemCache.getRegion(PR_ROOT_REGION_NAME, true);
} catch (IOException ieo) {
Assert.assertTrue(false, "IOException creating Partitioned Region root: " + ieo);
} catch (ClassNotFoundException cne) {
Assert.assertTrue(false, "ClassNotFoundExcpetion creating Partitioned Region root: " + cne);
}
}
Assert.assertTrue(root!=null, "Can not obtain internal Partitioned Region configuration root");
return root;
}
//TODO rebalancing - this code was added here in the merge of -r22804:23093 from trunk
//because of changes made on trunk that require this method, which was removed on
//prRebalancing. It probably needs refactoring.
//The idea here is to remove meta data from the partitioned region for a node that
//has left the cache.
//A couple options that didn't work
// - remove metadata in region advisor for PR instead - this doesn't work because
//the a member can close it's cache and then recreate the same region. Another member
//might end up removing meta data after the region is recreated, leading to inconsistent metadata
// - remove metadata on cache closure in the member that is closing - This didn't work because
//we can't do region operations after isClosing is set to true (to remove metadata). Removing metadata
//before is closing is set to true results operations being silently ignored because of inconsistent metadata
//and regions.
/**
* Clean the config meta data for a DistributedMember which has left the
* DistributedSystem, one PartitionedRegion at a time.
*/
public static void cleanUpMetaDataOnNodeFailure(DistributedMember failedMemId)
{
try {
final GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
if(cache == null || cache.getCancelCriterion().cancelInProgress() != null) {
return;
}
DM dm = cache.getDistributedSystem().getDistributionManager();
if (logger.isDebugEnabled()) {
logger.debug("Cleaning PartitionedRegion meta data for memberId={}", failedMemId);
}
Region rootReg = PartitionedRegionHelper.getPRRoot(cache, false);
if (rootReg == null) {
return;
}
final ArrayList<String> ks = new ArrayList<String>(rootReg.keySet());
if (ks.size() > 1) {
Collections.shuffle(ks, PartitionedRegion.rand);
}
for (String prName : ks ) {
try {
cleanUpMetaDataForRegion(cache, prName, failedMemId, null);
} catch (CancelException e) {
// okay to ignore this - metadata will be cleaned up by cache close operation
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Got exception in cleaning up metadata. {}", e.getMessage(), e);
}
}
}
} catch(CancelException e) {
//ignore
}
}
public static void cleanUpMetaDataForRegion(final GemFireCacheImpl cache,
final String prName, final DistributedMember failedMemId,
final Runnable postCleanupTask) {
boolean runPostCleanUp = true;
try {
final PartitionRegionConfig prConf;
Region rootReg = PartitionedRegionHelper.getPRRoot(cache, false);
if (rootReg == null) {
return;
}
try {
prConf = (PartitionRegionConfig)rootReg.get(prName);
}
catch (EntryDestroyedException ede) {
return;
}
if (prConf == null) {
// darrel says: I'm seeing an NPE in this code after pr->rem
// merge
// so I added this check and continue
return;
}
Set<Node> nodeList = prConf.getNodes();
if (nodeList == null) {
return;
}
for (final Node node1 : nodeList) {
if (cache.getCancelCriterion().cancelInProgress() != null) {
return;
}
if (node1.getMemberId().equals(failedMemId)) {
//Do the cleanup in another thread so we don't have the advisor locked.
//Fix for #45365, we don't schedule an asynchronous task until
//we have determined the node to remove (Which includes the
//serial number).
cache.getDistributionManager().getPrMetaDataCleanupThreadPool().execute(new Runnable() {
public void run() {
cleanPartitionedRegionMetaDataForNode(cache,
node1, prConf, prName);
if(postCleanupTask != null) {
postCleanupTask.run();
}
}
});
runPostCleanUp = false;
return;
}
}
} finally {
if(runPostCleanUp && postCleanupTask != null) {
postCleanupTask.run();
}
}
}
/**
* This is a function for cleaning the config meta data (both the
* configuration data and the buckets) for a Node that hosted a
* PartitionedRegion
*/
private static void cleanPartitionedRegionMetaDataForNode(GemFireCacheImpl cache, Node node,
PartitionRegionConfig prConf, String regionIdentifier)
{
if (logger.isDebugEnabled()) {
logger.debug("Cleaning PartitionedRegion meta data for node={} for Partitioned Region={} configuration={}",
node, regionIdentifier, prConf);
}
PartitionedRegionHelper.removeGlobalMetadataForFailedNode(node, regionIdentifier,
cache);
if (logger.isDebugEnabled()) {
logger.debug("Done Cleaning PartitionedRegion meta data for memberId={} for {}", node, regionIdentifier);
}
}
/**
* Runs hashCode() on given key producing a long value and then finds absolute
* value of the modulus with bucketSize. For better key distribution, possibly
* use MD5 or SHA or any unique ID generator for the hash function.
*
* @param pr
* the partitioned region on which to operate
* @param key
* the key on which to determine the hash key
* @return the bucket id the key hashes to
*/
// private static int NOSIGN = 0x7fffffff;
/* public static int getHashKey(PartitionedObject key)
{
PartitionedRegion pRegion = (PartitionedRegion)entryOp.getRegion();
RoutingResolver resolver = pRegion.getRoutingResolver();
int totalNumberOfBuckets = pRegion.getTotalNumberOfBuckets();
Object resolveKey = null;
if (resolver == null) {
resolveKey = key;
} else {
//resolveKey = resolver.getPartitionKey(key);
resolveKey = resolver.getRoutingObject((EntryOperation)key);
}
int hc = resolveKey.hashCode();
int bucketId = hc % totalNumberOfBuckets;
// Force positive bucket ids only
return Math.abs(bucketId);
// We should use the same hash code spreader as most other java.util hash tables.
// h += ~(h << 9);
// h ^= (h >>> 14);
// h += (h << 4);
// h ^= (h >>> 10);
// h &= NOSIGN;
// return h % totalNumberOfBuckets;
}
**/
static private PartitionResolver getResolver(PartitionedRegion pr,
Object key, Object callbackArgument) {
// First choice is one associated with the region
PartitionResolver result = pr.getPartitionResolver();
if (result != null) {
return result;
}
// Second is the key
if (key != null && key instanceof PartitionResolver) {
return (PartitionResolver)key;
}
// Third is the callback argument
if (callbackArgument != null
&& callbackArgument instanceof PartitionResolver) {
return (PartitionResolver)callbackArgument;
}
// There is no resolver.
return null;
}
/**
* Runs hashCode() on given key/routing object producing a long value and then
* finds absolute value of the modulus with bucketSize. For better key
* distribution, possibly use MD5 or SHA or any unique ID generator for the
* hash function.
*
* @param pr
* the partitioned region on which to operate
* @param operation
* operation
* @param key
* the key on which to determine the hash key
* @param callbackArgument
* the callbackArgument is passed to
* <code>PartitionResolver</code> to get Routing object
* @return the bucket id the key/routing object hashes to
*/
public static int getHashKey(PartitionedRegion pr, Operation operation,
Object key, Object value, Object callbackArgument) {
// avoid creating EntryOperation if there is no resolver
try {
return getHashKey(null, pr, operation, key, value, callbackArgument);
} catch (IllegalStateException e) { // bug #43651 - check for shutdown before throwing this
pr.getCache().getCancelCriterion().checkCancelInProgress(e);
throw e;
}
}
/**
* Runs hashCode() on given key/routing object producing a long value and then
* finds absolute value of the modulus with bucketSize. For better key
* distribution, possibly use MD5 or SHA or any unique ID generator for the
* hash function.
*
* @param event
* entry event created for this entry operation
* @return the bucket id the key/routing object hashes to
*/
public static int getHashKey(EntryOperation event) {
return getHashKey(event, null, null, null, null, null);
}
/**
* Runs hashCode() on given key/routing object producing a long value and then
* finds absolute value of the modulus with bucketSize. For better key
* distribution, possibly use MD5 or SHA or any unique ID generator for the
* hash function.
*
* @param event
* entry event created for this entry operation; can be null
* @param pr
* the partitioned region on which to operate
* @param operation
* operation
* @param key
* the key on which to determine the hash key
* @param callbackArgument
* the callbackArgument is passed to
* <code>PartitionResolver</code> to get Routing object
* @return the bucket id the key/routing object hashes to
*/
private static int getHashKey(EntryOperation event, PartitionedRegion pr,
Operation operation, Object key, Object value, Object callbackArgument) {
// avoid creating EntryOperation if there is no resolver
if (event != null) {
pr = (PartitionedRegion)event.getRegion();
key = event.getKey();
callbackArgument = event.getCallbackArgument();
}
PartitionResolver resolver = getResolver(pr, key, callbackArgument);
Object resolveKey = null;
if (pr.isFixedPartitionedRegion()) {
String partition = null ;
if (resolver instanceof FixedPartitionResolver) {
Map<String, Integer[]> partitionMap = pr.getPartitionsMap();
if (event == null) {
event = new EntryOperationImpl(pr, operation, key, value,
callbackArgument);
}
partition = ((FixedPartitionResolver)resolver).getPartitionName(
event, partitionMap.keySet());
if (partition == null) {
Object[] prms = new Object[] { pr.getName(), resolver };
throw new IllegalStateException(
LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL
.toLocalizedString(prms));
}
Integer[] bucketArray = partitionMap.get(partition);
if (bucketArray == null) {
Object[] prms = new Object[] { pr.getName(), partition };
throw new PartitionNotAvailableException(
LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_FIXED_PARTITION_1_IS_NOT_AVAILABLE_ON_ANY_DATASTORE
.toLocalizedString(prms));
}
int numBukets = bucketArray[1];
resolveKey = (numBukets == 1) ? partition : resolver.getRoutingObject(event);
}
else if (resolver == null) {
throw new IllegalStateException(
LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_FIXED_PARTITION_RESOLVER_IS_NOT_AVAILABLE
.toString(pr.getName()));
}
else if (!(resolver instanceof FixedPartitionResolver)) {
Object[] prms = new Object[] { pr.getName(), resolver };
throw new IllegalStateException(
LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_RESOLVER_DEFINED_1_IS_NOT_AN_INSTANCE_OF_FIXEDPARTITIONRESOLVER
.toLocalizedString(prms));
}
return assignFixedBucketId(pr, partition, resolveKey);
}
else {
// Calculate resolveKey.
if (resolver == null) {
// no custom partitioning at all
resolveKey = key;
if (resolveKey == null) {
throw new IllegalStateException("attempting to hash null");
}
}
else {
if (event == null) {
event = new EntryOperationImpl(pr, operation, key, value,
callbackArgument);
}
resolveKey = resolver.getRoutingObject(event);
if (resolveKey == null) {
throw new IllegalStateException(
LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL
.toLocalizedString());
}
}
// Finally, calculate the hash.
return getHashKey(pr, resolveKey);
}
}
private static int assignFixedBucketId(PartitionedRegion pr,
String partition, Object resolveKey) {
int startingBucketID = 0;
int partitionNumBuckets = 0;
boolean isPartitionAvailable = pr.getPartitionsMap().containsKey(partition);
Integer[] partitionDeatils = pr.getPartitionsMap().get(partition);
if (isPartitionAvailable) {
startingBucketID = partitionDeatils[0];
partitionNumBuckets = partitionDeatils[1];
int hc = resolveKey.hashCode();
int bucketId = Math.abs(hc % partitionNumBuckets);
int partitionBucketID = bucketId + startingBucketID;
assert partitionBucketID != KeyInfo.UNKNOWN_BUCKET;
return partitionBucketID;
}
List<FixedPartitionAttributesImpl> localFPAs = pr
.getFixedPartitionAttributesImpl();
if (localFPAs != null) {
for (FixedPartitionAttributesImpl fpa : localFPAs) {
if (fpa.getPartitionName().equals(partition)) {
isPartitionAvailable = true;
partitionNumBuckets = fpa.getNumBuckets();
startingBucketID = fpa.getStartingBucketID();
break;
}
}
}
if (!isPartitionAvailable) {
List<FixedPartitionAttributesImpl> remoteFPAs = pr.getRegionAdvisor()
.adviseAllFixedPartitionAttributes();
for (FixedPartitionAttributesImpl fpa : remoteFPAs) {
if (fpa.getPartitionName().equals(partition)) {
isPartitionAvailable = true;
partitionNumBuckets = fpa.getNumBuckets();
startingBucketID = fpa.getStartingBucketID();
break;
}
}
}
if (partitionNumBuckets == 0) {
if (isPartitionAvailable) {
Object[] prms = new Object[] { pr.getName(), partition };
throw new IllegalStateException(
LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_FOR_PARTITION_1_PARTITIION_NUM_BUCKETS_ARE_SET_TO_0_BUCKETS_CANNOT_BE_CREATED_ON_THIS_MEMBER
.toLocalizedString(prms));
}
}
if (!isPartitionAvailable) {
Object[] prms = new Object[] { pr.getName(), partition };
throw new PartitionNotAvailableException(
LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITION_NAME_1_IS_NOT_AVAILABLE_ON_ANY_DATASTORE
.toLocalizedString(prms));
}
int hc = resolveKey.hashCode();
int bucketId = Math.abs(hc % partitionNumBuckets);
int partitionBucketID = bucketId + startingBucketID;
assert partitionBucketID != KeyInfo.UNKNOWN_BUCKET;
return partitionBucketID;
}
public static int getHashKey(PartitionedRegion pr, Object routingObject) {
return getHashKey(routingObject, pr.getTotalNumberOfBuckets());
}
public static int getHashKey(Object routingObject, int totalNumBuckets) {
int hc = routingObject.hashCode();
int bucketId = hc % totalNumBuckets;
// Force positive bucket ids only
return Math.abs(bucketId);
}
public static PartitionedRegion getPartitionedRegion(String prName,
Cache cache)
{
Region region = cache.getRegion(prName);
if (region != null) {
if (region instanceof PartitionedRegion)
return (PartitionedRegion)region;
}
return null;
}
public static boolean isBucketRegion(String fullPath) {
return getBucketName(fullPath) != null;
}
/**
* Find a ProxyBucketRegion by parsing the region fullPath
* @param cache
* @param fullPath full region path to parse
* @param postInit true if caller should wait for bucket initialization to complete
* @return ProxyBucketRegion as Bucket or null if not found
* @throws PRLocallyDestroyedException if the PartitionRegion is locally destroyed
*/
public static Bucket getProxyBucketRegion(Cache cache, String fullPath, boolean postInit) throws PRLocallyDestroyedException {
if(cache == null) {
//No cache
return null;
}
// fullPath = /__PR/_B_1_10
String bucketName = getBucketName(fullPath);
if (bucketName == null) {
return null;
}
String prid = getPRPath(bucketName);
// PartitionedRegion region =
// PartitionedRegion.getPRFromId(Integer.parseInt(prid));
Region region;
int oldLevel = // Set thread local flag to allow entrance through initialization Latch
LocalRegion.setThreadInitLevelRequirement(LocalRegion.ANY_INIT);
try {
region = cache.getRegion(prid);
} finally {
LocalRegion.setThreadInitLevelRequirement(oldLevel);
}
if (region == null || !(region instanceof PartitionedRegion)) {
return null;
}
PartitionedRegion pr = (PartitionedRegion) region;
int bid = getBucketId(bucketName);
RegionAdvisor ra = (RegionAdvisor) pr.getDistributionAdvisor();
if (postInit) {
return ra.getBucketPostInit(bid);
}
else if ( ! ra.areBucketsInitialized()) {
// While the RegionAdvisor may be available, it's bucket meta-data may not be constructed yet
return null;
}
else {
return ra.getBucket(bid);
}
}
private final static String BUCKET_FULL_PATH_PREFIX = PR_ROOT_REGION_NAME + Region.SEPARATOR + BUCKET_REGION_PREFIX;
/**
* Get the bucket string by parsing the region fullPath
* @param bucketFullPath full region path to parse
* @return the bucket string or null if no bucket string is present
*/
public static String getBucketName(String bucketFullPath) {
if (bucketFullPath == null || bucketFullPath.length() == 0) {
return null;
}
int idxStartRoot = bucketFullPath.indexOf(BUCKET_FULL_PATH_PREFIX);
// parse bucketString
if (idxStartRoot != -1) {
int idxEndRoot = idxStartRoot + PR_ROOT_REGION_NAME.length() + Region.SEPARATOR.length();
return bucketFullPath.substring(idxEndRoot);
}
DistributedSystem ids = InternalDistributedSystem.getAnyInstance();
if (ids != null) {
if (logger.isDebugEnabled()) {
logger.debug("getBucketString no match fullPath={}", bucketFullPath);
}
}
return null;
}
public static String getBucketFullPath(String prFullPath, int bucketId) {
String name = getBucketName(prFullPath, bucketId);
if (name != null)
return Region.SEPARATOR + PR_ROOT_REGION_NAME + Region.SEPARATOR + name;
return null;
}
public static String escapePRPath(String prFullPath) {
String escaped = prFullPath.replace("_", "__");
escaped = escaped.replace(LocalRegion.SEPARATOR_CHAR, '_');
return escaped;
}
public static String TWO_SEPARATORS = LocalRegion.SEPARATOR + LocalRegion.SEPARATOR;
public static String unescapePRPath(String escapedPath) {
String path = escapedPath.replace('_', LocalRegion.SEPARATOR_CHAR);
path = path.replace(TWO_SEPARATORS, "_");
return path;
}
public static String getBucketName(String prPath, int bucketId) {
return PartitionedRegionHelper.BUCKET_REGION_PREFIX
+ PartitionedRegionHelper.escapePRPath(prPath)
+ PartitionedRegion.BUCKET_NAME_SEPARATOR + bucketId;
}
/**
* Returns the PR name give the bucketName (see getBucketName).
*/
public static String getPRPath(String bucketName) {
// bucketName = _B_PRNAME_10
int pridIdx =
PartitionedRegionHelper.BUCKET_REGION_PREFIX.length();
int bidSepIdx =
bucketName.lastIndexOf(PartitionedRegion.BUCKET_NAME_SEPARATOR);
Assert.assertTrue(bidSepIdx > -1,
"getProxyBucketRegion failed on " + bucketName);
return unescapePRPath(bucketName.substring(pridIdx, bidSepIdx));
}
/**
* Returns the bucket id gvien the bucketName (see getBucketName).
*/
public static int getBucketId(String bucketName) {
// bucketName = _B_PRNAME_10
int bidSepIdx =
bucketName.lastIndexOf(PartitionedRegion.BUCKET_NAME_SEPARATOR);
String bid = bucketName.substring(bidSepIdx+1);
return Integer.parseInt(bid);
}
/**
* This method returns true if the last region in provided fullPath is a
* sub-region else it returns false. If fullPath is "/REGION1" it would return
* false and if it is "/REGION1/REGION2", it would return true, which means
* that Region2 is a sub-region.
*
* @param fullPath
* full path of the region
* @return true if given full path has sub-regions else return false
*/
public static boolean isSubRegion(String fullPath)
{
boolean isSubRegion = false;
if (null != fullPath) {
int idx = fullPath.indexOf(Region.SEPARATOR, Region.SEPARATOR.length());
if (idx >= 0)
isSubRegion = true;
}
return isSubRegion;
}
/**
* This method returns true if the member is found in the membership list of
* this member, else false.
* @param mem
* @param cache
* @return true if mem is found in membership list of this member.
*/
public static boolean isMemberAlive(DistributedMember mem, GemFireCacheImpl cache) {
return getMembershipSet(cache).contains(mem);
}
/**
* Returns the current membership Set for this member.
* @param cache
* @return membership Set.
*/
public static Set getMembershipSet(GemFireCacheImpl cache) {
return cache.getDistributedSystem().getDistributionManager()
.getDistributionManagerIds();
}
/**
* Utility method to print warning when nodeList in b2n region is found empty.
* This will signify potential data loss scenario.
* @param partitionedRegion
* @param bucketId Id of Bucket whose nodeList in b2n is empty.
* @param callingMethod methodName of the calling method.
*/
public static void logForDataLoss(PartitionedRegion partitionedRegion, int bucketId, String callingMethod) {
if (! Boolean.getBoolean("gemfire.PRDebug")) {
return;
}
Region root = PartitionedRegionHelper.getPRRoot(partitionedRegion.getCache());
// Region allPartitionedRegions = PartitionedRegionHelper.getPRConfigRegion(
// root, partitionedRegion.getCache());
PartitionRegionConfig prConfig = (PartitionRegionConfig)root
.get(partitionedRegion.getRegionIdentifier());
if( prConfig == null )
return;
Set members = partitionedRegion.getDistributionManager()
.getDistributionManagerIds();
logger.warn(LocalizedMessage.create(
LocalizedStrings.PartitionedRegionHelper_DATALOSS___0____SIZE_OF_NODELIST_AFTER_VERIFYBUCKETNODES_FOR_BUKID___1__IS_0,
new Object[] {callingMethod, Integer.valueOf(bucketId)}));
logger.warn(LocalizedMessage.create(
LocalizedStrings.PartitionedRegionHelper_DATALOSS___0____NODELIST_FROM_PRCONFIG___1,
new Object[] {callingMethod, printCollection(prConfig.getNodes())}));
logger.warn(LocalizedMessage.create(
LocalizedStrings.PartitionedRegionHelper_DATALOSS___0____CURRENT_MEMBERSHIP_LIST___1,
new Object[] {callingMethod, printCollection(members)}));
}
/**
* Utility method to print a collection.
* @param c
* @return String
*/
public static String printCollection(Collection c) {
if (c != null) {
StringBuffer sb = new StringBuffer("[");
Iterator itr = c.iterator();
while(itr.hasNext()) {
sb.append(itr.next());
if (itr.hasNext()) {
sb.append(", ");
}
}
sb.append("]");
return sb.toString();
} else {
return "[null]";
}
}
/**
* Destroys and removes the distributed lock service.
* This is called from cache closure operation.
*
* @see PartitionedRegion#afterRegionsClosedByCacheClose(GemFireCacheImpl)
*/
static void destroyLockService() {
DistributedLockService dls = null;
synchronized (dlockMonitor) {
dls = DistributedLockService
.getServiceNamed(PARTITION_LOCK_SERVICE_NAME);
}
if(dls != null) {
try {
DistributedLockService.destroy(PARTITION_LOCK_SERVICE_NAME);
} catch (IllegalArgumentException ex) {
// Our dlockService is already destroyed,
// probably by another thread - ignore
}
}
}
public static boolean isBucketPrimary(Bucket buk) {
return buk.getBucketAdvisor().isPrimary();
}
public static boolean isRemotePrimaryAvailable(PartitionedRegion region,
FixedPartitionAttributesImpl fpa) {
List<FixedPartitionAttributesImpl> fpaList = region.getRegionAdvisor()
.adviseSameFPAs(fpa);
for (FixedPartitionAttributes remotefpa : fpaList) {
if (remotefpa.isPrimary()) {
return true;
}
}
return false;
}
public static FixedPartitionAttributesImpl getFixedPartitionAttributesForBucket(
PartitionedRegion pr, int bucketId) {
List<FixedPartitionAttributesImpl> localFPAs = pr
.getFixedPartitionAttributesImpl();
if (localFPAs != null) {
for (FixedPartitionAttributesImpl fpa : localFPAs) {
if (fpa.hasBucket(bucketId)) {
return fpa;
}
}
}
List<FixedPartitionAttributesImpl> remoteFPAs = pr.getRegionAdvisor()
.adviseAllFixedPartitionAttributes();
for (FixedPartitionAttributesImpl fpa : remoteFPAs) {
if (fpa.hasBucket(bucketId)) {
return fpa;
}
}
Object[] prms = new Object[] { pr.getName(), Integer.valueOf(bucketId) };
throw new PartitionNotAvailableException(
LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_FIXED_PARTITION_IS_NOT_AVAILABLE_FOR_BUCKET_1_ON_ANY_DATASTORE
.toLocalizedString(prms));
}
private static Set<String> getAllAvailablePartitions(PartitionedRegion region) {
Set<String> partitionSet = new HashSet<String>();
List<FixedPartitionAttributesImpl> localFPAs = region
.getFixedPartitionAttributesImpl();
if (localFPAs != null) {
for (FixedPartitionAttributesImpl fpa : localFPAs) {
partitionSet.add(fpa.getPartitionName());
}
}
List<FixedPartitionAttributesImpl> remoteFPAs = region.getRegionAdvisor()
.adviseAllFixedPartitionAttributes();
for (FixedPartitionAttributes fpa : remoteFPAs) {
partitionSet.add(fpa.getPartitionName());
}
return Collections.unmodifiableSet(partitionSet);
}
public static Set<FixedPartitionAttributes> getAllFixedPartitionAttributes(PartitionedRegion region) {
Set<FixedPartitionAttributes> fpaSet = new HashSet<FixedPartitionAttributes>();
List<FixedPartitionAttributesImpl> localFPAs = region
.getFixedPartitionAttributesImpl();
if (localFPAs != null) {
fpaSet.addAll(localFPAs);
}
List<FixedPartitionAttributesImpl> remoteFPAs = region.getRegionAdvisor()
.adviseAllFixedPartitionAttributes();
fpaSet.addAll(remoteFPAs);
return fpaSet;
}
private static class MemberFailureListener implements MembershipListener {
public void memberJoined(InternalDistributedMember id) {
}
public void memberDeparted(final InternalDistributedMember id, boolean crashed) {
PartitionedRegionHelper.cleanUpMetaDataOnNodeFailure(id);
}
public void memberSuspect(InternalDistributedMember id,
InternalDistributedMember whoSuspected) {
}
public void quorumLost(Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {
}
}
}
class FixedPartitionAttributesListener extends CacheListenerAdapter {
private static final Logger logger = LogService.getLogger();
public void afterCreate(EntryEvent event) {
PartitionRegionConfig prConfig = (PartitionRegionConfig)event.getNewValue();
if (!prConfig.getElderFPAs().isEmpty()) {
updatePartitionMap(prConfig);
}
}
public void afterUpdate(EntryEvent event) {
PartitionRegionConfig prConfig = (PartitionRegionConfig)event.getNewValue();
if (!prConfig.getElderFPAs().isEmpty()) {
updatePartitionMap(prConfig);
}
}
private void updatePartitionMap(PartitionRegionConfig prConfig) {
int prId = prConfig.getPRId();
PartitionedRegion pr = null;
try {
pr = PartitionedRegion.getPRFromId(prId);
if (pr != null) {
Map<String, Integer[]> partitionMap = pr.getPartitionsMap();
for (FixedPartitionAttributesImpl fxPrAttr : prConfig.getElderFPAs()) {
partitionMap.put(fxPrAttr.getPartitionName(), new Integer[] {
fxPrAttr.getStartingBucketID(), fxPrAttr.getNumBuckets() });
}
}
} catch (PRLocallyDestroyedException e) {
logger.debug("PRLocallyDestroyedException : Region ={} is locally destroyed on this node", prConfig.getPRId(), e);
}
}
}