blob: bf400e070d563e4ab09e9fb3b9261d103c4b9607 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.ANY_INIT;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryDestroyedException;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.FixedPartitionAttributes;
import org.apache.geode.cache.FixedPartitionResolver;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.PartitionResolver;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.partition.PartitionNotAvailableException;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache.util.CacheWriterAdapter;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.LocalRegion.InitializationLevel;
import org.apache.geode.internal.cache.partitioned.Bucket;
import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
import org.apache.geode.logging.internal.log4j.api.LogService;
public class PartitionedRegionHelper {
private static final Logger logger = LogService.getLogger();
/** 1 MB */
static final long BYTES_PER_MB = 1024 * 1024;
/**
* The administrative region used for storing Partitioned Region meta data sub regions *
*/
public static final String PR_ROOT_REGION_NAME = "__PR";
/** Name of the DistributedLockService that PartitionedRegions used. */
public static final String PARTITION_LOCK_SERVICE_NAME = "__PRLS";
/** This is used to create bucket regions */
static final String BUCKET_REGION_PREFIX = "_B_";
/**
* Time to wait for ownership (ms)
* <p>
* This should not be used normally. Internally, GemFire uses global locks to modify shared
* meta-data and this property controls the delay before giving up trying to acquire a global lock
*/
static final String VM_OWNERSHIP_WAIT_TIME_PROPERTY =
DistributionConfig.GEMFIRE_PREFIX + "VM_OWNERSHIP_WAIT_TIME";
/** Wait forever for ownership */
static final long VM_OWNERSHIP_WAIT_TIME_DEFAULT = Long.MAX_VALUE;
static final String MAX_PARTITIONED_REGION_ID = "MAX_PARTITIONED_REGION_ID";
public static final int DEFAULT_WAIT_PER_RETRY_ITERATION = 100; // milliseconds
public static final int DEFAULT_TOTAL_WAIT_RETRY_ITERATION = 60 * 60 * 1000; // milliseconds
@Immutable
public static final DataPolicy DEFAULT_DATA_POLICY = DataPolicy.PARTITION;
@Immutable
public static final Set ALLOWED_DATA_POLICIES;
static final Object dlockMonitor = new Object();
static {
Set policies = new HashSet();
policies.add(DEFAULT_DATA_POLICY);
policies.add(DataPolicy.PERSISTENT_PARTITION);
// policies.add(DataPolicy.NORMAL);
ALLOWED_DATA_POLICIES = Collections.unmodifiableSet(policies);
}
/**
* This function is used for cleaning the config meta data for the failed or closed
* PartitionedRegion node.
*
* @param failedNode The failed PartitionedRegion Node
* @param regionIdentifier The PartitionedRegion for which the cleanup is required
* @param cache GemFire cache.
*/
static void removeGlobalMetadataForFailedNode(Node failedNode, String regionIdentifier,
InternalCache cache) {
removeGlobalMetadataForFailedNode(failedNode, regionIdentifier, cache, true);
}
/**
* This function is used for cleaning the config meta data for the failed or closed
* PartitionedRegion node.
*
* @param failedNode The failed PartitionedRegion Node
* @param regionIdentifier The PartitionedRegion for which the cleanup is required
* @param cache GemFire cache.
* @param lock True if this removal should acquire and release the RegionLock
*/
static void removeGlobalMetadataForFailedNode(Node failedNode, String regionIdentifier,
InternalCache cache, final boolean lock) {
Region root = PartitionedRegionHelper.getPRRoot(cache, false);
if (root == null) {
return; // no partitioned region info to clean up
}
PartitionRegionConfig prConfig = (PartitionRegionConfig) root.get(regionIdentifier);
if (null == prConfig || !prConfig.containsNode(failedNode)) {
return;
}
final PartitionedRegion.RegionLock rl =
PartitionedRegion.getRegionLock(regionIdentifier, cache);
try {
if (lock) {
rl.lock();
}
prConfig = (PartitionRegionConfig) root.get(regionIdentifier);
if (prConfig != null && prConfig.containsNode(failedNode)) {
if (logger.isDebugEnabled()) {
logger.debug("Cleaning up config for pr {} node {}", regionIdentifier, failedNode);
}
if ((prConfig.getNumberOfNodes() - 1) <= 0) {
if (logger.isDebugEnabled()) {
logger.debug("No nodes left but failed node {} destroying entry {} nodes {}",
failedNode, regionIdentifier, prConfig.getNodes());
}
try {
root.destroy(regionIdentifier);
} catch (EntryNotFoundException e) {
logger.warn(
String.format("Got EntryNotFoundException in destroy Op for allPRRegion key, %s",
regionIdentifier),
e);
}
} else {
prConfig.removeNode(failedNode);
if (prConfig.getNumberOfNodes() == 0) {
root.destroy(regionIdentifier);
} else {
// We can't go backwards, or we potentially lose data
root.put(regionIdentifier, prConfig);
}
}
}
} finally {
if (lock) {
rl.unlock();
}
}
}
/**
* Return a region that is the root for all Partitioned Region metadata on this node
*/
public static LocalRegion getPRRoot(final InternalCache cache) {
return getPRRoot(cache, true);
}
/**
* Return a region that is the root for all PartitionedRegion meta data on this Node. The main
* administrative Regions contained within are <code>allPartitionedRegion</code> (Scope
* DISTRIBUTED_ACK) and <code>bucket2Node</code> (Scope DISTRIBUTED_ACK) and dataStore regions.
*
* @return a GLOBLAL scoped root region used for PartitionedRegion administration
*/
public static LocalRegion getPRRoot(final InternalCache cache, boolean createIfAbsent) {
DistributedRegion root = (DistributedRegion) cache.getRegion(PR_ROOT_REGION_NAME, true);
if (root == null) {
if (!createIfAbsent) {
return null;
}
if (logger.isDebugEnabled()) {
logger.debug("Creating root Partitioned Admin Region {}",
PartitionedRegionHelper.PR_ROOT_REGION_NAME);
}
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.addCacheListener(new FixedPartitionAttributesListener());
if (Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "PRDebug")) {
factory.addCacheListener(new CacheListenerAdapter() {
@Override
public void afterCreate(EntryEvent event) {
if (logger.isDebugEnabled()) {
logger.debug(
"Create Event for allPR: key = {} oldVal = {} newVal = {} Op = {} origin = {} isNetSearch = {}",
event.getKey(), event.getOldValue(), event.getNewValue(), event.getOperation(),
event.getDistributedMember(), event.getOperation().isNetSearch());
}
}
@Override
public void afterUpdate(EntryEvent event) {
if (logger.isDebugEnabled()) {
logger.debug(
"Update Event for allPR: key = {} oldVal = {} newVal = {} Op = {} origin = {} isNetSearch = {}",
event.getKey(), event.getOldValue(), event.getNewValue(), event.getOperation(),
event.getDistributedMember(), event.getOperation().isNetSearch());
}
}
@Override
public void afterDestroy(EntryEvent event) {
if (logger.isDebugEnabled()) {
logger.debug(
"Destroy Event for allPR: key = {} oldVal = {} newVal = {} Op = {} origin = {} isNetSearch = {}",
event.getKey(), event.getOldValue(), event.getNewValue(), event.getOperation(),
event.getDistributedMember(), event.getOperation().isNetSearch());
}
}
});
factory.setCacheWriter(new CacheWriterAdapter() {
@Override
public void beforeUpdate(EntryEvent event) throws CacheWriterException {
// the prConfig node list must advance (otherwise meta data becomes out of sync)
final PartitionRegionConfig newConf = (PartitionRegionConfig) event.getNewValue();
final PartitionRegionConfig oldConf = (PartitionRegionConfig) event.getOldValue();
if (newConf != oldConf && !newConf.isGreaterNodeListVersion(oldConf)) {
throw new CacheWriterException(
String.format(
"New PartitionedRegionConfig %s does not have newer version than previous %s",
new Object[] {newConf, oldConf}));
}
}
});
}
RegionAttributes ra = factory.create();
// Create anonymous stats holder for Partitioned Region meta data
final HasCachePerfStats prMetaStatsHolder = new HasCachePerfStats() {
@Override
public CachePerfStats getCachePerfStats() {
return new CachePerfStats(cache.getDistributedSystem(), "RegionStats-partitionMetaData",
cache.getStatisticsClock());
}
};
try {
root = (DistributedRegion) cache.createVMRegion(PR_ROOT_REGION_NAME, ra,
new InternalRegionArguments().setIsUsedForPartitionedRegionAdmin(true)
.setInternalRegion(true).setCachePerfStatsHolder(prMetaStatsHolder));
root.getDistributionAdvisor().addMembershipListener(new MemberFailureListener(cache));
} catch (RegionExistsException ignore) {
// we avoid this before hand, but yet we have to catch it
root = (DistributedRegion) cache.getRegion(PR_ROOT_REGION_NAME, true);
} catch (IOException ieo) {
Assert.assertTrue(false, "IOException creating Partitioned Region root: " + ieo);
} catch (ClassNotFoundException cne) {
Assert.assertTrue(false, "ClassNotFoundExcpetion creating Partitioned Region root: " + cne);
}
}
Assert.assertTrue(root != null,
"Can not obtain internal Partitioned Region configuration root");
return root;
}
// TODO rebalancing - this code was added here in the merge of -r22804:23093 from trunk
// because of changes made on trunk that require this method, which was removed on
// prRebalancing. It probably needs refactoring.
// The idea here is to remove meta data from the partitioned region for a node that
// has left the cache.
// A couple options that didn't work
// - remove metadata in region advisor for PR instead - this doesn't work because
// the a member can close it's cache and then recreate the same region. Another member
// might end up removing meta data after the region is recreated, leading to inconsistent metadata
// - remove metadata on cache closure in the member that is closing - This didn't work because
// we can't do region operations after isClosing is set to true (to remove metadata). Removing
// metadata
// before is closing is set to true results operations being silently ignored because of
// inconsistent metadata
// and regions.
/**
* Clean the config meta data for a DistributedMember which has left the DistributedSystem, one
* PartitionedRegion at a time.
*/
public static void cleanUpMetaDataOnNodeFailure(InternalCache cache,
DistributedMember failedMemId) {
try {
if (cache == null || cache.getCancelCriterion().isCancelInProgress()) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Cleaning PartitionedRegion meta data for memberId={}", failedMemId);
}
Region rootReg = PartitionedRegionHelper.getPRRoot(cache, false);
if (rootReg == null) {
return;
}
final ArrayList<String> ks = new ArrayList<String>(rootReg.keySet());
if (ks.size() > 1) {
Collections.shuffle(ks, PartitionedRegion.RANDOM);
}
for (String prName : ks) {
try {
cleanUpMetaDataForRegion(cache, prName, failedMemId, null);
} catch (CancelException ignore) {
// okay to ignore this - metadata will be cleaned up by cache close operation
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Got exception in cleaning up metadata. {}", e.getMessage(), e);
}
}
}
} catch (CancelException ignore) {
// ignore
}
}
public static void cleanUpMetaDataForRegion(final InternalCache cache, final String prName,
final DistributedMember failedMemId, final Runnable postCleanupTask) {
boolean runPostCleanUp = true;
try {
final PartitionRegionConfig prConf;
Region rootReg = PartitionedRegionHelper.getPRRoot(cache, false);
if (rootReg == null) {
return;
}
try {
prConf = (PartitionRegionConfig) rootReg.get(prName);
} catch (EntryDestroyedException ignore) {
return;
}
if (prConf == null) {
// darrel says: I'm seeing an NPE in this code after pr->rem
// merge
// so I added this check and continue
return;
}
Set<Node> nodeList = prConf.getNodes();
if (nodeList == null) {
return;
}
for (final Node node1 : nodeList) {
if (cache.getCancelCriterion().isCancelInProgress()) {
return;
}
if (node1.getMemberId().equals(failedMemId)) {
// Do the cleanup in another thread so we don't have the advisor locked.
// Fix for #45365, we don't schedule an asynchronous task until
// we have determined the node to remove (Which includes the
// serial number).
cache.getDistributionManager().getExecutors().getPrMetaDataCleanupThreadPool()
.execute(new Runnable() {
@Override
public void run() {
cleanPartitionedRegionMetaDataForNode(cache, node1, prConf, prName);
if (postCleanupTask != null) {
postCleanupTask.run();
}
}
});
runPostCleanUp = false;
return;
}
}
} finally {
if (runPostCleanUp && postCleanupTask != null) {
postCleanupTask.run();
}
}
}
/**
* This is a function for cleaning the config meta data (both the configuration data and the
* buckets) for a Node that hosted a PartitionedRegion
*/
private static void cleanPartitionedRegionMetaDataForNode(InternalCache cache, Node node,
PartitionRegionConfig prConf, String regionIdentifier) {
if (logger.isDebugEnabled()) {
logger.debug(
"Cleaning PartitionedRegion meta data for node={} for Partitioned Region={} configuration={}",
node, regionIdentifier, prConf);
}
PartitionedRegionHelper.removeGlobalMetadataForFailedNode(node, regionIdentifier, cache);
if (logger.isDebugEnabled()) {
logger.debug("Done Cleaning PartitionedRegion meta data for memberId={} for {}", node,
regionIdentifier);
}
}
/**
* Runs hashCode() on given key producing a long value and then finds absolute value of the
* modulus with bucketSize. For better key distribution, possibly use MD5 or SHA or any unique ID
* generator for the hash function.
*
* @param pr the partitioned region on which to operate
* @param key the key on which to determine the hash key
* @return the bucket id the key hashes to
*/
// private static int NOSIGN = 0x7fffffff;
/*
* public static int getHashKey(PartitionedObject key) { PartitionedRegion pRegion =
* (PartitionedRegion)entryOp.getRegion(); RoutingResolver resolver =
* pRegion.getRoutingResolver();
*
* int totalNumberOfBuckets = pRegion.getTotalNumberOfBuckets(); Object resolveKey = null; if
* (resolver == null) { resolveKey = key; } else {
*
* //resolveKey = resolver.getPartitionKey(key); resolveKey =
* resolver.getRoutingObject((EntryOperation)key); } int hc = resolveKey.hashCode(); int bucketId
* = hc % totalNumberOfBuckets; // Force positive bucket ids only return Math.abs(bucketId); // We
* should use the same hash code spreader as most other java.util hash tables. // h += ~(h << 9);
* // h ^= (h >>> 14); // h += (h << 4); // h ^= (h >>> 10); // h &= NOSIGN; // return h %
* totalNumberOfBuckets; }
**/
private static PartitionResolver getResolver(PartitionedRegion pr, Object key,
Object callbackArgument) {
// First choice is one associated with the region
PartitionResolver result = pr.getPartitionResolver();
if (result != null) {
return result;
}
// Second is the key
if (key != null && key instanceof PartitionResolver) {
return (PartitionResolver) key;
}
// Third is the callback argument
if (callbackArgument != null && callbackArgument instanceof PartitionResolver) {
return (PartitionResolver) callbackArgument;
}
// There is no resolver.
return null;
}
/**
* Runs hashCode() on given key/routing object producing a long value and then finds absolute
* value of the modulus with bucketSize. For better key distribution, possibly use MD5 or SHA or
* any unique ID generator for the hash function.
*
* @param pr the partitioned region on which to operate
* @param operation operation
* @param key the key on which to determine the hash key
* @param callbackArgument the callbackArgument is passed to <code>PartitionResolver</code> to get
* Routing object
* @return the bucket id the key/routing object hashes to
*/
public static int getHashKey(PartitionedRegion pr, Operation operation, Object key, Object value,
Object callbackArgument) {
// avoid creating EntryOperation if there is no resolver
try {
return getHashKey(null, pr, operation, key, value, callbackArgument);
} catch (IllegalStateException e) { // bug #43651 - check for shutdown before throwing this
pr.getCache().getCancelCriterion().checkCancelInProgress(e);
pr.checkClosedOrDestroyed(); // GEODE-2282 - check for region destroyed before throwing it
throw e;
}
}
/**
* Runs hashCode() on given key/routing object producing a long value and then finds absolute
* value of the modulus with bucketSize. For better key distribution, possibly use MD5 or SHA or
* any unique ID generator for the hash function.
*
* @param event entry event created for this entry operation
* @return the bucket id the key/routing object hashes to
*/
public static int getHashKey(EntryOperation event) {
return getHashKey(event, null, null, null, null, null);
}
/**
* Runs hashCode() on given key/routing object producing a long value and then finds absolute
* value of the modulus with bucketSize. For better key distribution, possibly use MD5 or SHA or
* any unique ID generator for the hash function.
*
* @param event entry event created for this entry operation; can be null
* @param pr the partitioned region on which to operate
* @param operation operation
* @param key the key on which to determine the hash key
* @param callbackArgument the callbackArgument is passed to <code>PartitionResolver</code> to get
* Routing object
* @return the bucket id the key/routing object hashes to
*/
private static int getHashKey(EntryOperation event, PartitionedRegion pr, Operation operation,
Object key, Object value, Object callbackArgument) {
// avoid creating EntryOperation if there is no resolver
if (event != null) {
pr = (PartitionedRegion) event.getRegion();
key = event.getKey();
callbackArgument = event.getCallbackArgument();
}
PartitionResolver resolver = getResolver(pr, key, callbackArgument);
Object resolveKey = null;
if (pr.isFixedPartitionedRegion()) {
String partition = null;
if (resolver instanceof FixedPartitionResolver) {
Map<String, Integer[]> partitionMap = pr.getPartitionsMap();
if (event == null) {
event = new EntryOperationImpl(pr, operation, key, value, callbackArgument);
}
partition =
((FixedPartitionResolver) resolver).getPartitionName(event, partitionMap.keySet());
if (partition == null) {
Object[] prms = new Object[] {pr.getName(), resolver};
throw new IllegalStateException(
String.format("For region %s, partition resolver %s returned partition name null",
prms));
}
Integer[] bucketArray = partitionMap.get(partition);
if (bucketArray == null) {
Object[] prms = new Object[] {pr.getName(), partition};
throw new PartitionNotAvailableException(
String.format(
"For FixedPartitionedRegion %s, partition %s is not available on any datastore.",
prms));
}
int numBukets = bucketArray[1];
resolveKey = (numBukets == 1) ? partition : resolver.getRoutingObject(event);
} else if (resolver == null) {
throw new IllegalStateException(
String.format(
"For FixedPartitionedRegion %s, FixedPartitionResolver is not available (neither through the partition attribute partition-resolver nor key/callbackArg implementing FixedPartitionResolver)",
pr.getName()));
} else {
Object[] prms = new Object[] {pr.getName(), resolver};
throw new IllegalStateException(
String.format(
"For FixedPartitionedRegion %s, Resolver defined %s is not an instance of FixedPartitionResolver",
prms));
}
return assignFixedBucketId(pr, partition, resolveKey);
} else {
// Calculate resolveKey.
if (resolver == null) {
// no custom partitioning at all
resolveKey = key;
if (resolveKey == null) {
throw new IllegalStateException("attempting to hash null");
}
} else {
if (event == null) {
event = new EntryOperationImpl(pr, operation, key, value, callbackArgument);
}
resolveKey = resolver.getRoutingObject(event);
if (resolveKey == null) {
throw new IllegalStateException(
"The RoutingObject returned by PartitionResolver is null.");
}
}
// Finally, calculate the hash.
return getHashKey(pr, resolveKey);
}
}
private static int assignFixedBucketId(PartitionedRegion pr, String partition,
Object resolveKey) {
int startingBucketID = 0;
int partitionNumBuckets = 0;
boolean isPartitionAvailable = pr.getPartitionsMap().containsKey(partition);
Integer[] partitionDeatils = pr.getPartitionsMap().get(partition);
if (isPartitionAvailable) {
startingBucketID = partitionDeatils[0];
partitionNumBuckets = partitionDeatils[1];
int hc = resolveKey.hashCode();
int bucketId = Math.abs(hc % partitionNumBuckets);
int partitionBucketID = bucketId + startingBucketID;
assert partitionBucketID != KeyInfo.UNKNOWN_BUCKET;
return partitionBucketID;
}
List<FixedPartitionAttributesImpl> localFPAs = pr.getFixedPartitionAttributesImpl();
if (localFPAs != null) {
for (FixedPartitionAttributesImpl fpa : localFPAs) {
if (fpa.getPartitionName().equals(partition)) {
isPartitionAvailable = true;
partitionNumBuckets = fpa.getNumBuckets();
startingBucketID = fpa.getStartingBucketID();
break;
}
}
}
if (!isPartitionAvailable) {
List<FixedPartitionAttributesImpl> remoteFPAs =
pr.getRegionAdvisor().adviseAllFixedPartitionAttributes();
for (FixedPartitionAttributesImpl fpa : remoteFPAs) {
if (fpa.getPartitionName().equals(partition)) {
isPartitionAvailable = true;
partitionNumBuckets = fpa.getNumBuckets();
startingBucketID = fpa.getStartingBucketID();
break;
}
}
}
if (partitionNumBuckets == 0) {
if (isPartitionAvailable) {
Object[] prms = new Object[] {pr.getName(), partition};
throw new IllegalStateException(
String.format(
"For region %s, For partition %s partition-num-buckets is set to 0. Buckets cann not be created on this partition.",
prms));
}
}
if (!isPartitionAvailable) {
Object[] prms = new Object[] {pr.getName(), partition};
throw new PartitionNotAvailableException(
String.format("For region %s, partition name %s is not available on any datastore.",
prms));
}
int hc = resolveKey.hashCode();
int bucketId = Math.abs(hc % partitionNumBuckets);
int partitionBucketID = bucketId + startingBucketID;
assert partitionBucketID != KeyInfo.UNKNOWN_BUCKET;
return partitionBucketID;
}
public static int getHashKey(PartitionedRegion pr, Object routingObject) {
return getHashKey(routingObject, pr.getTotalNumberOfBuckets());
}
public static int getHashKey(Object routingObject, int totalNumBuckets) {
int hc = routingObject.hashCode();
int bucketId = hc % totalNumBuckets;
// Force positive bucket ids only
return Math.abs(bucketId);
}
public static PartitionedRegion getPartitionedRegion(String prName, Cache cache) {
Region region = cache.getRegion(prName);
if (region != null) {
if (region instanceof PartitionedRegion)
return (PartitionedRegion) region;
}
return null;
}
public static boolean isBucketRegion(String fullPath) {
return getBucketName(fullPath) != null;
}
/**
* Find a ProxyBucketRegion by parsing the region fullPath
*
* @param fullPath full region path to parse
* @param postInit true if caller should wait for bucket initialization to complete
* @return ProxyBucketRegion as Bucket or null if not found
* @throws PRLocallyDestroyedException if the PartitionRegion is locally destroyed
*/
public static Bucket getProxyBucketRegion(Cache cache, String fullPath, boolean postInit)
throws PRLocallyDestroyedException {
if (cache == null) {
// No cache
return null;
}
// fullPath = /__PR/_B_1_10
String bucketName = getBucketName(fullPath);
if (bucketName == null) {
return null;
}
String prid = getPRPath(bucketName);
// PartitionedRegion region =
// PartitionedRegion.getPRFromId(Integer.parseInt(prid));
Region region;
final InitializationLevel oldLevel = LocalRegion.setThreadInitLevelRequirement(ANY_INIT);
try {
region = cache.getRegion(prid);
} finally {
LocalRegion.setThreadInitLevelRequirement(oldLevel);
}
if (region == null || !(region instanceof PartitionedRegion)) {
return null;
}
PartitionedRegion pr = (PartitionedRegion) region;
int bid = getBucketId(bucketName);
RegionAdvisor ra = (RegionAdvisor) pr.getDistributionAdvisor();
if (postInit) {
return ra.getBucketPostInit(bid);
} else if (!ra.areBucketsInitialized()) {
// While the RegionAdvisor may be available, it's bucket meta-data may not be constructed yet
return null;
} else {
return ra.getBucket(bid);
}
}
private static final String BUCKET_FULL_PATH_PREFIX =
PR_ROOT_REGION_NAME + Region.SEPARATOR + BUCKET_REGION_PREFIX;
/**
* Get the bucket string by parsing the region fullPath
*
* @param bucketFullPath full region path to parse
* @return the bucket string or null if no bucket string is present
*/
public static String getBucketName(String bucketFullPath) {
if (bucketFullPath == null || bucketFullPath.length() == 0) {
return null;
}
int idxStartRoot = bucketFullPath.indexOf(BUCKET_FULL_PATH_PREFIX);
// parse bucketString
if (idxStartRoot != -1) {
int idxEndRoot = idxStartRoot + PR_ROOT_REGION_NAME.length() + Region.SEPARATOR.length();
return bucketFullPath.substring(idxEndRoot);
}
if (logger.isDebugEnabled()) {
logger.debug("getBucketString no match fullPath={}", bucketFullPath);
}
return null;
}
public static String getBucketFullPath(String prFullPath, int bucketId) {
String name = getBucketName(prFullPath, bucketId);
if (name != null)
return Region.SEPARATOR + PR_ROOT_REGION_NAME + Region.SEPARATOR + name;
return null;
}
public static String escapePRPath(String prFullPath) {
String escaped = prFullPath.replace("_", "__");
escaped = escaped.replace(Region.SEPARATOR_CHAR, '_');
return escaped;
}
public static final String TWO_SEPARATORS = Region.SEPARATOR + Region.SEPARATOR;
public static String unescapePRPath(String escapedPath) {
String path = escapedPath.replace('_', Region.SEPARATOR_CHAR);
path = path.replace(TWO_SEPARATORS, "_");
return path;
}
public static String getBucketName(String prPath, int bucketId) {
return PartitionedRegionHelper.BUCKET_REGION_PREFIX
+ PartitionedRegionHelper.escapePRPath(prPath) + PartitionedRegion.BUCKET_NAME_SEPARATOR
+ bucketId;
}
/**
* Returns the PR name give the bucketName (see getBucketName).
*/
public static String getPRPath(String bucketName) {
// bucketName = _B_PRNAME_10
int pridIdx = PartitionedRegionHelper.BUCKET_REGION_PREFIX.length();
int bidSepIdx = bucketName.lastIndexOf(PartitionedRegion.BUCKET_NAME_SEPARATOR);
Assert.assertTrue(bidSepIdx > -1, "getProxyBucketRegion failed on " + bucketName);
return unescapePRPath(bucketName.substring(pridIdx, bidSepIdx));
}
/**
* Returns the bucket id gvien the bucketName (see getBucketName).
*/
public static int getBucketId(String bucketName) {
// bucketName = _B_PRNAME_10
int bidSepIdx = bucketName.lastIndexOf(PartitionedRegion.BUCKET_NAME_SEPARATOR);
String bid = bucketName.substring(bidSepIdx + 1);
return Integer.parseInt(bid);
}
/**
* This method returns true if the last region in provided fullPath is a sub-region else it
* returns false. If fullPath is "/REGION1" it would return false and if it is "/REGION1/REGION2",
* it would return true, which means that Region2 is a sub-region.
*
* @param fullPath full path of the region
* @return true if given full path has sub-regions else return false
*/
public static boolean isSubRegion(String fullPath) {
boolean isSubRegion = false;
if (null != fullPath) {
int idx = fullPath.indexOf(Region.SEPARATOR, Region.SEPARATOR.length());
if (idx >= 0)
isSubRegion = true;
}
return isSubRegion;
}
/**
* Utility method to print warning when nodeList in b2n region is found empty. This will signify
* potential data loss scenario.
*
* @param bucketId Id of Bucket whose nodeList in b2n is empty.
* @param callingMethod methodName of the calling method.
*/
public static void logForDataLoss(PartitionedRegion partitionedRegion, int bucketId,
String callingMethod) {
if (!Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "PRDebug")) {
return;
}
Region root = PartitionedRegionHelper.getPRRoot(partitionedRegion.getCache());
// Region allPartitionedRegions = PartitionedRegionHelper.getPRConfigRegion(
// root, partitionedRegion.getCache());
PartitionRegionConfig prConfig =
(PartitionRegionConfig) root.get(partitionedRegion.getRegionIdentifier());
if (prConfig == null)
return;
Set members = partitionedRegion.getDistributionManager().getDistributionManagerIds();
logger.warn(
"DATALOSS ( {} ) :: Size of nodeList After verifyBucketNodes for bucket ID, {} is 0",
callingMethod, bucketId);
logger.warn("DATALOSS ( {} ) :: NodeList from prConfig, {}",
callingMethod, printCollection(prConfig.getNodes()));
logger.warn("DATALOSS ( {} ) :: Current Membership List, {}",
callingMethod, printCollection(members));
}
/**
* Utility method to print a collection.
*
*/
public static String printCollection(Collection c) {
if (c != null) {
StringBuilder sb = new StringBuilder("[");
Iterator itr = c.iterator();
while (itr.hasNext()) {
sb.append(itr.next());
if (itr.hasNext()) {
sb.append(", ");
}
}
sb.append("]");
return sb.toString();
} else {
return "[null]";
}
}
public static FixedPartitionAttributesImpl getFixedPartitionAttributesForBucket(
PartitionedRegion pr, int bucketId) {
List<FixedPartitionAttributesImpl> localFPAs = pr.getFixedPartitionAttributesImpl();
if (localFPAs != null) {
for (FixedPartitionAttributesImpl fpa : localFPAs) {
if (fpa.hasBucket(bucketId)) {
return fpa;
}
}
}
List<FixedPartitionAttributesImpl> remoteFPAs =
pr.getRegionAdvisor().adviseAllFixedPartitionAttributes();
for (FixedPartitionAttributesImpl fpa : remoteFPAs) {
if (fpa.hasBucket(bucketId)) {
return fpa;
}
}
Object[] prms = new Object[] {pr.getName(), bucketId};
throw new PartitionNotAvailableException(
String.format(
"For FixedPartitionedRegion %s, Fixed partition is not defined for bucket id %s on any datastore",
prms));
}
private static Set<String> getAllAvailablePartitions(PartitionedRegion region) {
Set<String> partitionSet = new HashSet<String>();
List<FixedPartitionAttributesImpl> localFPAs = region.getFixedPartitionAttributesImpl();
if (localFPAs != null) {
for (FixedPartitionAttributesImpl fpa : localFPAs) {
partitionSet.add(fpa.getPartitionName());
}
}
List<FixedPartitionAttributesImpl> remoteFPAs =
region.getRegionAdvisor().adviseAllFixedPartitionAttributes();
for (FixedPartitionAttributes fpa : remoteFPAs) {
partitionSet.add(fpa.getPartitionName());
}
return Collections.unmodifiableSet(partitionSet);
}
public static Set<FixedPartitionAttributes> getAllFixedPartitionAttributes(
PartitionedRegion region) {
Set<FixedPartitionAttributes> fpaSet = new HashSet<FixedPartitionAttributes>();
List<FixedPartitionAttributesImpl> localFPAs = region.getFixedPartitionAttributesImpl();
if (localFPAs != null) {
fpaSet.addAll(localFPAs);
}
List<FixedPartitionAttributesImpl> remoteFPAs =
region.getRegionAdvisor().adviseAllFixedPartitionAttributes();
fpaSet.addAll(remoteFPAs);
return fpaSet;
}
private static class MemberFailureListener implements MembershipListener {
InternalCache cache = null;
MemberFailureListener(InternalCache cache) {
this.cache = cache;
}
@Override
public void memberJoined(DistributionManager distributionManager,
InternalDistributedMember id) {
}
@Override
public void memberDeparted(DistributionManager distributionManager,
final InternalDistributedMember id, boolean crashed) {
PartitionedRegionHelper.cleanUpMetaDataOnNodeFailure(cache, id);
}
@Override
public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id,
InternalDistributedMember whoSuspected, String reason) {}
@Override
public void quorumLost(DistributionManager distributionManager,
Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {}
}
static class FixedPartitionAttributesListener extends CacheListenerAdapter {
private static final Logger logger = LogService.getLogger();
@Override
public void afterCreate(EntryEvent event) {
PartitionRegionConfig prConfig = (PartitionRegionConfig) event.getNewValue();
if (!prConfig.getElderFPAs().isEmpty()) {
updatePartitionMap(prConfig);
}
}
@Override
public void afterUpdate(EntryEvent event) {
PartitionRegionConfig prConfig = (PartitionRegionConfig) event.getNewValue();
if (!prConfig.getElderFPAs().isEmpty()) {
updatePartitionMap(prConfig);
}
}
private void updatePartitionMap(PartitionRegionConfig prConfig) {
int prId = prConfig.getPRId();
PartitionedRegion pr = null;
try {
pr = PartitionedRegion.getPRFromId(prId);
if (pr != null) {
Map<String, Integer[]> partitionMap = pr.getPartitionsMap();
for (FixedPartitionAttributesImpl fxPrAttr : prConfig.getElderFPAs()) {
partitionMap.put(fxPrAttr.getPartitionName(),
new Integer[] {fxPrAttr.getStartingBucketID(), fxPrAttr.getNumBuckets()});
}
}
} catch (PRLocallyDestroyedException e) {
logger.debug("PRLocallyDestroyedException : Region ={} is locally destroyed on this node",
prConfig.getPRId(), e);
}
}
}
}