| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.geode.internal.cache; |
| |
| import static org.apache.geode.cache.Region.SEPARATOR; |
| import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.ANY_INIT; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.annotations.internal.MutableForTesting; |
| import org.apache.geode.cache.DiskStore; |
| import org.apache.geode.cache.EntryDestroyedException; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionDestroyedException; |
| import org.apache.geode.cache.execute.Function; |
| import org.apache.geode.cache.execute.FunctionService; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.cache.LocalRegion.InitializationLevel; |
| import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext; |
| import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException; |
| import org.apache.geode.internal.cache.persistence.PRPersistentConfig; |
| import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| import org.apache.geode.util.internal.GeodeGlossary; |
| |
| /** |
| * An utility class to retrieve colocated regions in a colocation hierarchy in various scenarios |
| * |
| * @since GemFire 6.0 |
| */ |
| public class ColocationHelper { |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** |
| * Whether to ignore missing parallel queues on restart if they are not attached to the region. |
| * See bug 50120. Mutable for tests. |
| */ |
| @MutableForTesting |
| public static boolean IGNORE_UNRECOVERED_QUEUE = |
| Boolean.getBoolean(GeodeGlossary.GEMFIRE_PREFIX + "IGNORE_UNRECOVERED_QUEUE"); |
| |
| /** |
| * An utility method to retrieve colocated region of a given partitioned region |
| * |
| * @return colocated PartitionedRegion |
| * @throws IllegalStateException for missing colocated region |
| * @since GemFire 5.8Beta |
| */ |
| public static PartitionedRegion getColocatedRegion(final PartitionedRegion partitionedRegion) { |
| Assert.assertTrue(partitionedRegion != null); // precondition1 |
| String colocatedWith = partitionedRegion.getPartitionAttributes().getColocatedWith(); |
| if (colocatedWith == null) { |
| // the region is not colocated with any region |
| return null; |
| } |
| Region prRoot = PartitionedRegionHelper.getPRRoot(partitionedRegion.getCache()); |
| PartitionRegionConfig prConf = |
| (PartitionRegionConfig) prRoot.get(getRegionIdentifier(colocatedWith)); |
| if (prConf == null) { |
| partitionedRegion.getCache().getCancelCriterion().checkCancelInProgress(null); |
| throw new IllegalStateException( |
| String.format( |
| "Region specified in 'colocated-with' (%s) for region %s does not exist. It should be created before setting 'colocated-with' attribute for this region.", |
| new Object[] {colocatedWith, partitionedRegion.getFullPath()})); |
| } |
| int prID = prConf.getPRId(); |
| PartitionedRegion colocatedPR = null; |
| try { |
| colocatedPR = PartitionedRegion.getPRFromId(prID); |
| if (colocatedPR != null) { |
| colocatedPR.waitOnBucketMetadataInitialization(); |
| } else { |
| partitionedRegion.getCache().getCancelCriterion().checkCancelInProgress(null); |
| throw new IllegalStateException( |
| String.format( |
| "Region specified in 'colocated-with' (%s) for region %s does not exist. It should be created before setting 'colocated-with' attribute for this region.", |
| new Object[] {colocatedWith, partitionedRegion.getFullPath()})); |
| } |
| } catch (PRLocallyDestroyedException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "PRLocallyDestroyedException : Region with prId={} is locally destroyed on this node", |
| prID, e); |
| } |
| } |
| return colocatedPR; |
| } |
| |
| /** |
| * An utility to make sure that a member contains all of the partitioned regions that are |
| * colocated with a given region on other members. TODO rebalance - this is rather inefficient, |
| * and probably all this junk should be in the advisor. |
| */ |
| public static boolean checkMembersColocation(PartitionedRegion partitionedRegion, |
| InternalDistributedMember member) { |
| List<PartitionRegionConfig> tempcolocatedRegions = new ArrayList<PartitionRegionConfig>(); |
| Region prRoot = PartitionedRegionHelper.getPRRoot(partitionedRegion.getCache()); |
| PartitionRegionConfig regionConfig = |
| (PartitionRegionConfig) prRoot.get(partitionedRegion.getRegionIdentifier()); |
| // The region was probably concurrently destroyed |
| if (regionConfig == null) { |
| return false; |
| } |
| tempcolocatedRegions.add(regionConfig); |
| List<PartitionRegionConfig> colocatedRegions = |
| new ArrayList<PartitionRegionConfig>(tempcolocatedRegions); |
| PartitionRegionConfig prConf = null; |
| do { |
| PartitionRegionConfig tempToBeColocatedWith = tempcolocatedRegions.remove(0); |
| for (Iterator itr = prRoot.keySet().iterator(); itr.hasNext();) { |
| String prName = (String) itr.next(); |
| try { |
| prConf = (PartitionRegionConfig) prRoot.get(prName); |
| } catch (EntryDestroyedException ignore) { |
| continue; |
| } |
| if (prConf == null) { |
| // darrel says: I'm seeing an NPE in this code after pr->rem |
| // merge so I added this check and continue |
| continue; |
| } |
| if (prConf.getColocatedWith() != null) { |
| if (prConf.getColocatedWith().equals(tempToBeColocatedWith.getFullPath()) |
| || (SEPARATOR + prConf.getColocatedWith()) |
| .equals(tempToBeColocatedWith.getFullPath())) { |
| colocatedRegions.add(prConf); |
| tempcolocatedRegions.add(prConf); |
| } |
| } |
| } |
| } while (!tempcolocatedRegions.isEmpty()); |
| |
| PartitionRegionConfig tempColocatedWith = regionConfig; |
| prConf = null; |
| while (true) { |
| String colocatedWithRegionName = tempColocatedWith.getColocatedWith(); |
| if (colocatedWithRegionName == null) |
| break; |
| else { |
| prConf = (PartitionRegionConfig) prRoot.get(getRegionIdentifier(colocatedWithRegionName)); |
| if (prConf == null) { |
| break; |
| } |
| colocatedRegions.add(tempColocatedWith); |
| tempColocatedWith = prConf; |
| } |
| } |
| |
| // Now check to make sure that all of the colocated regions |
| // Have this member. |
| |
| // We don't need a hostname because the equals method doesn't check it. |
| for (PartitionRegionConfig config : colocatedRegions) { |
| if (config.isColocationComplete() && !config.containsMember(member)) { |
| return false; |
| } |
| } |
| |
| // Check to make sure all of the persisted regions that are colocated |
| // with this region have been created. |
| if (hasOfflineColocatedChildRegions(partitionedRegion)) { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Returns true if there are regions that are persisted on this member and were previously |
| * colocated with the given region, but have not yet been created. |
| * |
| * @param region The parent region |
| * @return true if there are any child regions that are persisted on this member, but have not yet |
| * been created. |
| */ |
| private static boolean hasOfflineColocatedChildRegions(PartitionedRegion region) { |
| boolean hasOfflineChildren = false; |
| final InitializationLevel oldLevel = LocalRegion.setThreadInitLevelRequirement(ANY_INIT); |
| try { |
| InternalCache cache = region.getCache(); |
| Collection<DiskStore> stores = cache.listDiskStores(); |
| // Look through all of the disk stores for offline colocated child regions |
| for (DiskStore diskStore : stores) { |
| // Look at all of the partitioned regions. |
| |
| for (Map.Entry<String, PRPersistentConfig> entry : ((DiskStoreImpl) diskStore).getAllPRs() |
| .entrySet()) { |
| |
| PRPersistentConfig config = entry.getValue(); |
| String childName = entry.getKey(); |
| |
| // Check to see if they're colocated with this region. |
| if (region.getFullPath().equals(config.getColocatedWith())) { |
| PartitionedRegion childRegion = (PartitionedRegion) cache.getRegion(childName); |
| |
| if (childRegion == null) { |
| // If the child region is offline, return true |
| // unless it is a parallel queue that the user has removed. |
| if (!ignoreUnrecoveredQueue(region, childName)) { |
| region.addMissingColocatedRegionLogger(childName); |
| hasOfflineChildren = true; |
| } |
| } else { |
| // Otherwise, look for offline children of that region. |
| if (hasOfflineColocatedChildRegions(childRegion)) { |
| hasOfflineChildren = true; |
| // Add the offline children of this child to the region's missingChildren list |
| region.addMissingColocatedRegionLogger(childRegion); |
| } |
| } |
| } |
| } |
| } |
| } finally { |
| LocalRegion.setThreadInitLevelRequirement(oldLevel); |
| } |
| return hasOfflineChildren; |
| } |
| |
| private static boolean ignoreUnrecoveredQueue(PartitionedRegion region, String childName) { |
| // Hack for #50120 if the childRegion is an async queue, but we |
| // no longer define the async queue, ignore it. |
| if (!ParallelGatewaySenderQueue.isParallelQueue(childName)) { |
| return false; |
| } |
| |
| String senderId = ParallelGatewaySenderQueue.getSenderId(childName); |
| |
| if (!region.getAsyncEventQueueIds().contains(senderId) |
| && !region.getParallelGatewaySenderIds().contains(senderId) && IGNORE_UNRECOVERED_QUEUE) { |
| return true; |
| } |
| |
| // TODO Auto-generated method stub |
| return false; |
| } |
| |
| /** |
| * A utility to check to see if a region has been created on all of the VMs that host the regions |
| * this region is colocated with. |
| */ |
| public static boolean isColocationComplete(PartitionedRegion region) { |
| Region prRoot = PartitionedRegionHelper.getPRRoot(region.getCache()); |
| PartitionRegionConfig config = (PartitionRegionConfig) prRoot.get(region.getRegionIdentifier()); |
| // Fix for bug 40075. There is race between this call and the region being concurrently |
| // destroyed. |
| if (config == null) { |
| Assert.assertTrue(region.isDestroyed() || region.isClosed, |
| "Region is not destroyed, but there is no entry in the prRoot for region " + region); |
| return false; |
| } |
| return config.isColocationComplete(); |
| } |
| |
| /** |
| * An utility method to retrieve all partitioned regions(excluding self) in a colocation chain<br> |
| * <p> |
| * For example, shipmentPR is colocated with orderPR and orderPR is colocated with customerPR <br> |
| * <br> |
| * getAllColocationRegions(customerPR) --> List{orderPR, shipmentPR}<br> |
| * getAllColocationRegions(orderPR) --> List{customerPR, shipmentPR}<br> |
| * getAllColocationRegions(shipmentPR) --> List{customerPR, orderPR}<br> |
| * |
| * @return List of all partitioned regions (excluding self) in a colocated chain |
| * @since GemFire 5.8Beta |
| */ |
| public static Map<String, PartitionedRegion> getAllColocationRegions( |
| PartitionedRegion partitionedRegion) { |
| Map<String, PartitionedRegion> colocatedRegions = new HashMap<String, PartitionedRegion>(); |
| List<PartitionedRegion> colocatedByRegion = partitionedRegion.getColocatedByList(); |
| if (colocatedByRegion.size() != 0) { |
| List<PartitionedRegion> tempcolocatedRegions = |
| new ArrayList<PartitionedRegion>(colocatedByRegion); |
| do { |
| PartitionedRegion pRegion = tempcolocatedRegions.remove(0); |
| pRegion.waitOnBucketMetadataInitialization(); |
| colocatedRegions.put(pRegion.getFullPath(), pRegion); |
| tempcolocatedRegions.addAll(pRegion.getColocatedByList()); |
| } while (!tempcolocatedRegions.isEmpty()); |
| } |
| PartitionedRegion tempColocatedWith = partitionedRegion; |
| while (true) { |
| PartitionedRegion colocatedWithRegion = tempColocatedWith.getColocatedWithRegion(); |
| if (colocatedWithRegion == null) |
| break; |
| else { |
| colocatedRegions.put(colocatedWithRegion.getFullPath(), colocatedWithRegion); |
| tempColocatedWith = colocatedWithRegion; |
| } |
| } |
| return colocatedRegions; |
| } |
| |
| /** |
| * gets local data of colocated regions on a particular data store |
| * |
| * @return map of region name to local colocated regions |
| * @since GemFire 5.8Beta |
| */ |
| public static Map<String, Region> getAllColocatedLocalDataSets( |
| PartitionedRegion partitionedRegion, InternalRegionFunctionContext context) { |
| Map<String, PartitionedRegion> colocatedRegions = getAllColocationRegions(partitionedRegion); |
| Map<String, Region> colocatedLocalRegions = new HashMap<String, Region>(); |
| for (Iterator itr = colocatedRegions.entrySet().iterator(); itr.hasNext();) { |
| Map.Entry me = (Entry) itr.next(); |
| final Region pr = (Region) me.getValue(); |
| colocatedLocalRegions.put((String) me.getKey(), context.getLocalDataSet(pr)); |
| } |
| return colocatedLocalRegions; |
| } |
| |
| public static Map<String, LocalDataSet> constructAndGetAllColocatedLocalDataSet( |
| PartitionedRegion region, int[] bucketArray) { |
| Map<String, LocalDataSet> colocatedLocalDataSets = new HashMap<String, LocalDataSet>(); |
| if (region.getColocatedWith() == null && (!region.isColocatedBy())) { |
| colocatedLocalDataSets.put(region.getFullPath(), |
| new LocalDataSet(region, bucketArray)); |
| return colocatedLocalDataSets; |
| } |
| Map<String, PartitionedRegion> colocatedRegions = |
| ColocationHelper.getAllColocationRegions(region); |
| for (Region colocatedRegion : colocatedRegions.values()) { |
| colocatedLocalDataSets.put(colocatedRegion.getFullPath(), |
| new LocalDataSet((PartitionedRegion) colocatedRegion, bucketArray)); |
| } |
| colocatedLocalDataSets.put(region.getFullPath(), |
| new LocalDataSet(region, bucketArray)); |
| return colocatedLocalDataSets; |
| } |
| |
| public static Map<String, LocalDataSet> getColocatedLocalDataSetsForBuckets( |
| PartitionedRegion region, Set<Integer> bucketSet) { |
| if (region.getColocatedWith() == null && (!region.isColocatedBy())) { |
| return Collections.emptyMap(); |
| } |
| Map<String, LocalDataSet> ret = new HashMap<String, LocalDataSet>(); |
| Map<String, PartitionedRegion> colocatedRegions = |
| ColocationHelper.getAllColocationRegions(region); |
| for (Region colocatedRegion : colocatedRegions.values()) { |
| ret.put(colocatedRegion.getFullPath(), |
| new LocalDataSet((PartitionedRegion) colocatedRegion, bucketSet)); |
| } |
| return ret; |
| } |
| |
| /** |
| * A utility method to retrieve all child partitioned regions that are directly colocated to the |
| * specified partitioned region.<br> |
| * <p> |
| * For example, shipmentPR is colocated with orderPR and orderPR is colocated with customerPR. |
| * <br> |
| * getColocatedChildRegions(customerPR) will return List{orderPR}<br> |
| * getColocatedChildRegions(orderPR) will return List{shipmentPR}<br> |
| * getColocatedChildRegions(shipmentPR) will return empty List{}<br> |
| * |
| * @return list of all child partitioned regions colocated with the region |
| * @since GemFire 5.8Beta |
| */ |
| public static List<PartitionedRegion> getColocatedChildRegions( |
| PartitionedRegion partitionedRegion) { |
| List<PartitionedRegion> colocatedChildRegions = new ArrayList<PartitionedRegion>(); |
| Region prRoot = PartitionedRegionHelper.getPRRoot(partitionedRegion.getCache()); |
| PartitionRegionConfig prConf = null; |
| // final List allPRNamesList = new ArrayList(prRoot.keySet()); |
| Iterator itr = prRoot.keySet().iterator(); |
| while (itr.hasNext()) { |
| try { |
| String prName = (String) itr.next(); |
| if (prName.equals(partitionedRegion.getRegionIdentifier())) { |
| // region can't be a child of itself |
| continue; |
| } |
| try { |
| prConf = (PartitionRegionConfig) prRoot.get(prName); |
| } catch (EntryDestroyedException ignore) { |
| continue; |
| } |
| if (prConf == null) { |
| // darrel says: I'm seeing an NPE in this code after pr->rem |
| // merge so I added this check and continue |
| continue; |
| } |
| int prID = prConf.getPRId(); |
| PartitionedRegion prRegion = PartitionedRegion.getPRFromId(prID); |
| if (prRegion != null) { |
| if (prRegion.getColocatedWith() != null) { |
| if (prRegion.getColocatedWith().equals(partitionedRegion.getFullPath()) |
| || (SEPARATOR + prRegion.getColocatedWith()) |
| .equals(partitionedRegion.getFullPath())) { |
| // only regions directly colocatedWith partitionedRegion are |
| // added to the list... |
| prRegion.waitOnBucketMetadataInitialization(); |
| colocatedChildRegions.add(prRegion); |
| } |
| } |
| } |
| } catch (PRLocallyDestroyedException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("PRLocallyDestroyedException : Region ={} is locally destroyed on this node", |
| prConf.getPRId(), e); |
| } |
| } catch (RegionDestroyedException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("RegionDestroyedException : Region ={} is destroyed.", prConf.getPRId(), e); |
| } |
| } |
| } |
| |
| // Fix for 44484 - Make the list of colocated child regions |
| // is always in the same order on all nodes. |
| Collections.sort(colocatedChildRegions, new Comparator<PartitionedRegion>() { |
| @Override |
| public int compare(PartitionedRegion o1, PartitionedRegion o2) { |
| if (o1.isShadowPR() == o2.isShadowPR()) { |
| return o1.getFullPath().compareTo(o2.getFullPath()); |
| } |
| if (o1.isShadowPR()) { |
| return 1; |
| } |
| return -1; |
| } |
| }); |
| return colocatedChildRegions; |
| } |
| |
| // TODO why do we have this method here? |
| public static Function getFunctionInstance(Serializable function) { |
| Function functionInstance = null; |
| if (function instanceof String) { |
| functionInstance = FunctionService.getFunction((String) function); |
| Assert.assertTrue(functionInstance != null, |
| "Function " + function + " is not registered on this node "); |
| } else { |
| functionInstance = (Function) function; |
| } |
| return functionInstance; |
| } |
| |
| public static PartitionedRegion getLeaderRegion(PartitionedRegion prRegion) { |
| PartitionedRegion parentRegion; |
| |
| while ((parentRegion = getColocatedRegion(prRegion)) != null) { |
| prRegion = parentRegion; |
| } |
| |
| return prRegion; |
| } |
| |
| private static String getRegionIdentifier(String regionName) { |
| if (regionName.startsWith(SEPARATOR)) { |
| return regionName.replace(SEPARATOR, "#"); |
| } else { |
| return "#" + regionName.replace(SEPARATOR, "#"); |
| } |
| } |
| |
| /** |
| * Test to see if there are any persistent child regions of a partitioned region. |
| */ |
| public static boolean hasPersistentChildRegion(PartitionedRegion region) { |
| for (PartitionedRegion child : getColocatedChildRegions(region)) { |
| if (child.getDataPolicy().withPersistence()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| } |