| /* |
| * ========================================================================= |
| * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * more patents listed at http://www.pivotal.io/patents. |
| * ========================================================================= |
| */ |
| package com.gemstone.gemfire.cache.client.internal; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CopyOnWriteArraySet; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import com.gemstone.gemfire.SystemFailure; |
| import com.gemstone.gemfire.cache.Cache; |
| import com.gemstone.gemfire.cache.EntryOperation; |
| import com.gemstone.gemfire.cache.FixedPartitionResolver; |
| import com.gemstone.gemfire.cache.Operation; |
| import com.gemstone.gemfire.cache.PartitionResolver; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.distributed.internal.ServerLocation; |
| import com.gemstone.gemfire.internal.cache.BucketServerLocation66; |
| import com.gemstone.gemfire.internal.cache.EntryOperationImpl; |
| import com.gemstone.gemfire.internal.cache.LocalRegion; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.logging.LogService; |
| |
| /** |
| * Maintains {@link ClientPartitionAdvisor} for Partitioned Regions on servers |
| * Client operations will consult this service to identify the server locations |
| * on which the data for the client operation is residing |
| * |
| * @author Suranjan Kumar |
| * @author Yogesh Mahajan |
| * |
| * @since 6.5 |
| * |
| */ |
| public final class ClientMetadataService { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| private final Cache cache; |
| |
| private final Set<String> nonPRs = new HashSet<String>(); |
| |
| private boolean HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP = Boolean.getBoolean("gemfire.PoolImpl.honourServerGroupsInPRSingleHop"); |
| |
| public static final int SIZE_BYTES_ARRAY_RECEIVED = 2; |
| |
| public static final int INITIAL_VERSION = 0; |
| |
| /** random number generator used in pruning */ |
| private final Random rand = new Random(); |
| |
| private volatile boolean isMetadataStable = true; |
| |
| public ClientMetadataService(Cache cache) { |
| this.cache = cache; |
| } |
| |
| private final Map<String, ClientPartitionAdvisor> clientPRAdvisors = new ConcurrentHashMap<String, ClientPartitionAdvisor>(); |
| private final Map<String, Set<ClientPartitionAdvisor>> colocatedPRAdvisors = new ConcurrentHashMap<String, Set<ClientPartitionAdvisor>>(); |
| |
| private PartitionResolver getResolver(Region r, Object key, |
| Object callbackArgument) { |
| // First choice is one associated with the region |
| final String regionFullPath = r.getFullPath(); |
| ClientPartitionAdvisor advisor = this |
| .getClientPartitionAdvisor(regionFullPath); |
| PartitionResolver result = null; |
| if (advisor != null) { |
| result = advisor.getPartitionResolver(); |
| } |
| |
| if (result != null) { |
| return result; |
| } |
| |
| // Second is the key |
| if (key != null && key instanceof PartitionResolver) { |
| return (PartitionResolver)key; |
| } |
| |
| // Third is the callback argument |
| if (callbackArgument != null |
| && callbackArgument instanceof PartitionResolver) { |
| return (PartitionResolver)callbackArgument; |
| } |
| // There is no resolver. |
| return null; |
| } |
| |
| public ServerLocation getBucketServerLocation(Region region, |
| Operation operation, Object key, Object value, Object callbackArg) { |
| ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(region.getFullPath()); |
| if (prAdvisor == null) { |
| return null; |
| } |
| int totalNumberOfBuckets = prAdvisor.getTotalNumBuckets(); |
| |
| final PartitionResolver resolver = getResolver(region, key, callbackArg); |
| Object resolveKey; |
| EntryOperation entryOp = null; |
| if (resolver == null) { |
| // client has not registered PartitionResolver |
| // Assuming even PR at server side is not using PartitionResolver |
| resolveKey = key; |
| } |
| else { |
| entryOp = new EntryOperationImpl(region, operation, key, |
| value, callbackArg); |
| resolveKey = resolver.getRoutingObject(entryOp); |
| if (resolveKey == null) { |
| throw new IllegalStateException( |
| LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL |
| .toLocalizedString()); |
| } |
| } |
| int bucketId; |
| if (resolver instanceof FixedPartitionResolver) { |
| if (entryOp == null) { |
| entryOp = new EntryOperationImpl(region, |
| Operation.FUNCTION_EXECUTION, key, null, null); |
| } |
| String partition = ((FixedPartitionResolver)resolver).getPartitionName( |
| entryOp, prAdvisor.getFixedPartitionNames()); |
| if (partition == null) { |
| Object[] prms = new Object[] { region.getName(), resolver }; |
| throw new IllegalStateException( |
| LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL |
| .toLocalizedString(prms)); |
| } |
| else { |
| bucketId = prAdvisor.assignFixedBucketId(region, partition, resolveKey); |
| if (bucketId == -1) { |
| // scheduleGetPRMetaData((LocalRegion)region); |
| return null; |
| } |
| |
| } |
| }else { |
| bucketId = PartitionedRegionHelper.getHashKey(resolveKey, totalNumberOfBuckets); |
| } |
| |
| ServerLocation bucketServerLocation = getServerLocation(region, operation, |
| bucketId); |
| ServerLocation location = null; |
| if (bucketServerLocation != null) |
| location = new ServerLocation(bucketServerLocation.getHostName(), |
| bucketServerLocation.getPort()); |
| return location; |
| } |
| |
| private ServerLocation getServerLocation(Region region, Operation operation, |
| int bucketId) { |
| final String regionFullPath = region.getFullPath(); |
| ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath); |
| if (prAdvisor == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("ClientMetadataService#getServerLocation : Region {} prAdvisor does not exist.", regionFullPath); |
| } |
| return null; |
| } |
| |
| // if (prAdvisor.getColocatedWith() != null) { |
| // prAdvisor = this.getClientPartitionAdvisor(prAdvisor.getColocatedWith()); |
| // if (prAdvisor == null) { |
| // if (this.logger.fineEnabled()) { |
| // this.logger.fine( |
| // "ClientMetadataService#getServerLocation : Region " |
| // + regionFullPath + "prAdvisor does not exist."); |
| // } |
| // return null; |
| // } |
| // } |
| |
| if (operation.isGet()) { |
| return prAdvisor.adviseServerLocation(bucketId); |
| } |
| else { |
| return prAdvisor.advisePrimaryServerLocation(bucketId); |
| } |
| } |
| |
| public Map<ServerLocation, HashSet> getServerToFilterMap( |
| final Collection routingKeys, final Region region, boolean primaryMembersNeeded |
| ) { |
| return getServerToFilterMap(routingKeys, region, primaryMembersNeeded, false); |
| } |
| |
| public Map<ServerLocation, HashSet> getServerToFilterMap( |
| final Collection routingKeys, final Region region, boolean primaryMembersNeeded, |
| boolean bucketsAsFilter) { |
| final String regionFullPath = region.getFullPath(); |
| ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath); |
| if (prAdvisor == null || prAdvisor.adviseRandomServerLocation() == null) { |
| scheduleGetPRMetaData((LocalRegion)region, false); |
| return null; |
| } |
| HashMap<Integer, HashSet> bucketToKeysMap = groupByBucketOnClientSide( |
| region, prAdvisor, routingKeys, bucketsAsFilter); |
| |
| HashMap<ServerLocation, HashSet> serverToKeysMap = new HashMap<ServerLocation, HashSet>(); |
| HashMap<ServerLocation, HashSet<Integer>> serverToBuckets = groupByServerToBuckets( |
| prAdvisor, bucketToKeysMap.keySet(), primaryMembersNeeded); |
| |
| if(serverToBuckets == null){ |
| return null; |
| } |
| |
| for (Map.Entry entry : serverToBuckets.entrySet()) { |
| ServerLocation server = (ServerLocation)entry.getKey(); |
| HashSet<Integer> buckets = (HashSet)entry.getValue(); |
| for (Integer bucket : buckets) { |
| // use LinkedHashSet to maintain the order of keys |
| // the keys will be iterated several times |
| LinkedHashSet keys = (LinkedHashSet)serverToKeysMap.get(server); |
| if (keys == null) { |
| keys = new LinkedHashSet(); |
| } |
| keys.addAll(bucketToKeysMap.get(bucket)); |
| serverToKeysMap.put(server, keys); |
| } |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("Returning server to keys map : {}", serverToKeysMap); |
| } |
| |
| return serverToKeysMap; |
| } |
| |
| public HashMap<ServerLocation, HashSet<Integer>> groupByServerToAllBuckets(Region region, boolean primaryOnly){ |
| final String regionFullPath = region.getFullPath(); |
| ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath); |
| if (prAdvisor == null || prAdvisor.adviseRandomServerLocation() == null) { |
| scheduleGetPRMetaData((LocalRegion)region, false); |
| return null; |
| } |
| int totalNumberOfBuckets = prAdvisor.getTotalNumBuckets(); |
| HashSet<Integer> allBucketIds = new HashSet<Integer>(); |
| for(int i =0; i < totalNumberOfBuckets; i++){ |
| allBucketIds.add(i); |
| } |
| return groupByServerToBuckets(prAdvisor, allBucketIds, primaryOnly); |
| } |
| /** |
| * This function should make a map of server to buckets it is hosting. |
| * If for some bucket servers are not available due to mismatch in metadata |
| * it should fill up a random server for it. |
| */ |
| private HashMap<ServerLocation, HashSet<Integer>> groupByServerToBuckets( |
| ClientPartitionAdvisor prAdvisor, Set<Integer> bucketSet, |
| boolean primaryOnly) { |
| if (primaryOnly) { |
| HashMap<ServerLocation, HashSet<Integer>> serverToBucketsMap = new HashMap<ServerLocation, HashSet<Integer>>(); |
| for (Integer bucketId : bucketSet) { |
| ServerLocation server = prAdvisor.advisePrimaryServerLocation(bucketId); |
| if (server == null) { |
| //If we don't have the metadata for some buckets, return |
| //null, indicating that we don't have any metadata. This |
| //will cause us to use the non-single hop path. |
| return null; |
| } |
| HashSet<Integer> buckets = serverToBucketsMap.get(server); |
| if (buckets == null) { |
| buckets = new HashSet<Integer>(); // faster if this was an ArrayList |
| serverToBucketsMap.put(server, buckets); |
| } |
| buckets.add(bucketId); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("ClientMetadataService: The server to bucket map is : {}", serverToBucketsMap); |
| } |
| |
| return serverToBucketsMap; |
| } |
| else { |
| return pruneNodes(prAdvisor, bucketSet); |
| } |
| } |
| |
| |
| private HashMap<ServerLocation, HashSet<Integer>> pruneNodes( |
| ClientPartitionAdvisor prAdvisor, Set<Integer> buckets) { |
| |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| if (isDebugEnabled) { |
| logger.debug("ClientMetadataService: The buckets to be pruned are: {}", buckets); |
| } |
| HashMap<ServerLocation, HashSet<Integer>> serverToBucketsMap = new HashMap<ServerLocation, HashSet<Integer>>(); |
| HashMap<ServerLocation, HashSet<Integer>> prunedServerToBucketsMap = new HashMap<ServerLocation, HashSet<Integer>>(); |
| |
| for (Integer bucketId : buckets) { |
| List<BucketServerLocation66> serversList = prAdvisor |
| .adviseServerLocations(bucketId); |
| if (isDebugEnabled) { |
| logger.debug("ClientMetadataService: For bucketId {} the server list is {}", bucketId, serversList); |
| } |
| if (serversList == null || serversList.size() == 0) { |
| //If we don't have the metadata for some buckets, return |
| //null, indicating that we don't have any metadata. This |
| //will cause us to use the non-single hop path. |
| return null; |
| } |
| |
| if (isDebugEnabled) { |
| logger.debug("ClientMetadataService: The buckets owners of the bucket: {} are: {}", bucketId, serversList); |
| } |
| |
| for (ServerLocation server : serversList) { |
| if (serverToBucketsMap.get(server) == null) { |
| HashSet<Integer> bucketSet = new HashSet<Integer>(); |
| bucketSet.add(bucketId); |
| serverToBucketsMap.put(server, bucketSet); |
| } |
| else { |
| HashSet<Integer> bucketSet = serverToBucketsMap.get(server); |
| bucketSet.add(bucketId); |
| serverToBucketsMap.put(server, bucketSet); |
| } |
| } |
| } |
| if (isDebugEnabled) { |
| logger.debug("ClientMetadataService: The server to buckets map is : {}", serverToBucketsMap); |
| } |
| |
| HashSet<Integer> currentBucketSet = new HashSet<Integer>(); |
| // ServerLocation randomFirstServer = |
| // prAdvisor.adviseRandomServerLocation(); // get a random server here |
| ServerLocation randomFirstServer = null; |
| if (serverToBucketsMap.isEmpty()) { |
| return null; |
| } |
| else { |
| int size = serverToBucketsMap.size(); |
| randomFirstServer = (ServerLocation)serverToBucketsMap.keySet().toArray()[rand.nextInt(size)]; |
| } |
| HashSet<Integer> bucketSet = serverToBucketsMap.get(randomFirstServer); |
| if (isDebugEnabled) { |
| logger.debug("ClientMetadataService: Adding the server : {} which is random and buckets {} to prunedMap", randomFirstServer, bucketSet); |
| } |
| currentBucketSet.addAll(bucketSet); |
| prunedServerToBucketsMap.put(randomFirstServer, bucketSet); |
| serverToBucketsMap.remove(randomFirstServer); |
| |
| while (!currentBucketSet.equals(buckets)) { |
| ServerLocation server = findNextServer(serverToBucketsMap.entrySet(), |
| currentBucketSet); |
| if (server == null) { |
| // HashSet<Integer> rBuckets = prunedServerToBucketsMap |
| // .get(randomFirstServer); |
| // HashSet<Integer> remainingBuckets = new HashSet<Integer>(buckets); |
| // remainingBuckets.removeAll(currentBucketSet); |
| // rBuckets.addAll(remainingBuckets); |
| // prunedServerToBucketsMap.put(randomFirstServer, rBuckets); |
| break; |
| } |
| |
| HashSet<Integer> bucketSet2 = serverToBucketsMap.get(server); |
| bucketSet2.removeAll(currentBucketSet); |
| if(bucketSet2.isEmpty()) { |
| serverToBucketsMap.remove(server); |
| continue; |
| } |
| currentBucketSet.addAll(bucketSet2); |
| prunedServerToBucketsMap.put(server, bucketSet2); |
| if (isDebugEnabled) { |
| logger.debug("ClientMetadataService: Adding the server : {} and buckets {} to prunedServer.", server, bucketSet2); |
| } |
| serverToBucketsMap.remove(server); |
| } |
| |
| if (isDebugEnabled) { |
| logger.debug("ClientMetadataService: The final prunedServerToBucket calculated is : {}", prunedServerToBucketsMap); |
| } |
| |
| return prunedServerToBucketsMap; |
| } |
| |
| |
| private ServerLocation findNextServer( |
| Set<Map.Entry<ServerLocation, HashSet<Integer>>> entrySet, |
| HashSet<Integer> currentBucketSet) { |
| |
| ServerLocation server = null; |
| int max = -1; |
| ArrayList<ServerLocation> nodesOfEqualSize = new ArrayList<ServerLocation>(); |
| for (Map.Entry<ServerLocation, HashSet<Integer>> entry : entrySet) { |
| HashSet<Integer> buckets = new HashSet<Integer>(); |
| buckets.addAll(entry.getValue()); |
| buckets.removeAll(currentBucketSet); |
| |
| if (max < buckets.size()) { |
| max = buckets.size(); |
| server = entry.getKey(); |
| nodesOfEqualSize.clear(); |
| nodesOfEqualSize.add(server); |
| } |
| else if (max == buckets.size()){ |
| nodesOfEqualSize.add(server); |
| } |
| } |
| |
| //return node; |
| Random r = new Random(); |
| if(nodesOfEqualSize.size() > 0) |
| return nodesOfEqualSize.get(r.nextInt(nodesOfEqualSize.size())); |
| |
| return null; |
| } |
| |
| private HashMap<Integer, HashSet> groupByBucketOnClientSide(Region region, |
| ClientPartitionAdvisor prAdvisor, Collection routingKeys, boolean bucketsAsFilter) { |
| |
| HashMap<Integer, HashSet> bucketToKeysMap = new HashMap(); |
| int totalNumberOfBuckets = prAdvisor.getTotalNumBuckets(); |
| Iterator i = routingKeys.iterator(); |
| while (i.hasNext()) { |
| Object key = i.next(); |
| int bucketId = bucketsAsFilter ? ((Integer)key).intValue() : |
| extractBucketID(region, prAdvisor, totalNumberOfBuckets, key); |
| HashSet bucketKeys = bucketToKeysMap.get(bucketId); |
| if (bucketKeys == null) { |
| bucketKeys = new HashSet(); // faster if this was an ArrayList |
| bucketToKeysMap.put(bucketId, bucketKeys); |
| } |
| bucketKeys.add(key); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("Bucket to keys map : {}", bucketToKeysMap); |
| } |
| return bucketToKeysMap; |
| } |
| |
| private int extractBucketID(Region region, ClientPartitionAdvisor prAdvisor, |
| int totalNumberOfBuckets, Object key) { |
| int bucketId = -1; |
| final PartitionResolver resolver = getResolver(region, key, null); |
| Object resolveKey; |
| EntryOperation entryOp = null; |
| if (resolver == null) { |
| // client has not registered PartitionResolver |
| // Assuming even PR at server side is not using PartitionResolver |
| resolveKey = key; |
| } |
| else { |
| entryOp = new EntryOperationImpl(region, |
| Operation.FUNCTION_EXECUTION, key, null, null); |
| resolveKey = resolver.getRoutingObject(entryOp); |
| if (resolveKey == null) { |
| throw new IllegalStateException( |
| LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL |
| .toLocalizedString()); |
| } |
| } |
| |
| if (resolver instanceof FixedPartitionResolver) { |
| if (entryOp == null) { |
| entryOp = new EntryOperationImpl(region, |
| Operation.FUNCTION_EXECUTION, key, null, null); |
| } |
| String partition = ((FixedPartitionResolver)resolver).getPartitionName( |
| entryOp, prAdvisor.getFixedPartitionNames()); |
| if (partition == null) { |
| Object[] prms = new Object[] { region.getName(), resolver }; |
| throw new IllegalStateException( |
| LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL |
| .toLocalizedString(prms)); |
| } |
| else { |
| bucketId = prAdvisor.assignFixedBucketId(region, partition, resolveKey); |
| // This bucketid can be -1 in some circumstances where we don't have information about |
| // all the partition on the server. |
| // Do proactive scheduling of metadata fetch |
| if(bucketId == -1) { |
| scheduleGetPRMetaData((LocalRegion)region, true); |
| } |
| } |
| }else{ |
| bucketId = PartitionedRegionHelper.getHashKey(resolveKey, totalNumberOfBuckets); |
| } |
| return bucketId; |
| } |
| |
| |
| |
| public void scheduleGetPRMetaData(final LocalRegion region, |
| final boolean isRecursive) { |
| if(this.nonPRs.contains(region.getFullPath())){ |
| return; |
| } |
| this.setMetadataStable(false); |
| region.getCachePerfStats().incNonSingleHopsCount(); |
| if (isRecursive) { |
| try { |
| getClientPRMetadata(region); |
| } |
| catch (VirtualMachineError e) { |
| SystemFailure.initiateFailure(e); |
| throw e; |
| } |
| catch (Throwable e) { |
| SystemFailure.checkFailure(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("An exception occurred while fetching metadata", e); |
| } |
| } |
| } |
| else { |
| Runnable fetchTask = new Runnable() { |
| @SuppressWarnings("synthetic-access") |
| public void run() { |
| try { |
| getClientPRMetadata(region); |
| } |
| catch (VirtualMachineError e) { |
| SystemFailure.initiateFailure(e); |
| throw e; |
| } |
| catch (Throwable e) { |
| SystemFailure.checkFailure(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("An exception occurred while fetching metadata", e); |
| } |
| } |
| } |
| }; |
| SingleHopClientExecutor.submitTask(fetchTask); |
| } |
| } |
| |
| public final void getClientPRMetadata(LocalRegion region) { |
| final String regionFullPath = region.getFullPath(); |
| ClientPartitionAdvisor advisor = null; |
| InternalPool pool = region.getServerProxy().getPool(); |
| // Acquires lock only if it is free, else a request to fetch meta data is in |
| // progress, so just return |
| if (region.clientMetaDataLock.tryLock()) { |
| try { |
| advisor = this.getClientPartitionAdvisor(regionFullPath); |
| if (advisor==null) { |
| advisor = GetClientPartitionAttributesOp |
| .execute(pool, regionFullPath); |
| if(advisor == null){ |
| this.nonPRs.add(regionFullPath); |
| return; |
| } |
| addClientPartitionAdvisor(regionFullPath, advisor); |
| } |
| else { |
| if(advisor.getFixedPAMap() != null && !advisor.isFPAAttrsComplete()) { |
| ClientPartitionAdvisor newAdvisor = GetClientPartitionAttributesOp |
| .execute(pool, regionFullPath); |
| advisor.updateFixedPAMap(newAdvisor.getFixedPAMap()); |
| } |
| } |
| String colocatedWith = advisor.getColocatedWith(); |
| if (colocatedWith == null) { |
| isMetadataRefreshed_TEST_ONLY = true; |
| GetClientPRMetaDataOp.execute(pool, regionFullPath, this); |
| region.getCachePerfStats().incMetaDataRefreshCount(); |
| } |
| else { |
| ClientPartitionAdvisor colocatedAdvisor = this.getClientPartitionAdvisor(colocatedWith); |
| LocalRegion leaderRegion = (LocalRegion)region.getCache() |
| .getRegion(colocatedWith); |
| if (colocatedAdvisor == null) { |
| scheduleGetPRMetaData(leaderRegion, true); |
| return; |
| } |
| else { |
| isMetadataRefreshed_TEST_ONLY = true; |
| GetClientPRMetaDataOp.execute(pool, colocatedWith, this); |
| leaderRegion.getCachePerfStats().incMetaDataRefreshCount(); |
| } |
| } |
| } |
| finally { |
| region.clientMetaDataLock.unlock(); |
| } |
| } |
| } |
| |
| public void scheduleGetPRMetaData(final LocalRegion region, |
| final boolean isRecursive, byte nwHopType) { |
| if(this.nonPRs.contains(region.getFullPath())){ |
| return; |
| } |
| ClientPartitionAdvisor advisor = this.getClientPartitionAdvisor(region.getFullPath()); |
| if(advisor!= null && advisor.getServerGroup().length()!= 0 && HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP){ |
| if (logger.isDebugEnabled()) { |
| logger.debug("Scheduling metadata refresh : {}", nwHopType); |
| } |
| if(nwHopType == (byte)2){ |
| return; |
| } |
| } |
| region.getCachePerfStats().incNonSingleHopsCount(); |
| if (isRecursive) { |
| try { |
| getClientPRMetadata(region); |
| } catch (VirtualMachineError e) { |
| SystemFailure.initiateFailure(e); |
| throw e; |
| } catch (Throwable e) { |
| SystemFailure.checkFailure(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("An exception occurred while fetching metadata", e); |
| } |
| } |
| } else { |
| Runnable fetchTask = new Runnable() { |
| @SuppressWarnings("synthetic-access") |
| public void run() { |
| try { |
| getClientPRMetadata(region); |
| } catch (VirtualMachineError e) { |
| SystemFailure.initiateFailure(e); |
| throw e; |
| } catch (Throwable e) { |
| SystemFailure.checkFailure(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("An exception occurred while fetching metadata", e); |
| } |
| } |
| } |
| }; |
| SingleHopClientExecutor.submitTask(fetchTask); |
| } |
| } |
| |
| public void removeBucketServerLocation(ServerLocation serverLocation) { |
| Set<String> keys = getAllRegionFullPaths(); |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| if (isDebugEnabled) { |
| logger.debug("ClientMetadataService removing a ServerLocation :{}{}", serverLocation, keys); |
| } |
| if (keys != null) { |
| for (String regionPath : keys) { |
| ClientPartitionAdvisor prAdvisor = this |
| .getClientPartitionAdvisor(regionPath); |
| if (isDebugEnabled) { |
| logger.debug("ClientMetadataService removing from {}{}", regionPath, prAdvisor); |
| } |
| if (prAdvisor != null) { |
| prAdvisor.removeBucketServerLocation(serverLocation); |
| } |
| } |
| } |
| } |
| |
| public byte getMetaDataVersion(Region region, Operation operation, |
| Object key, Object value, Object callbackArg) { |
| ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(region |
| .getFullPath()); |
| if (prAdvisor == null) { |
| return 0; |
| } |
| |
| int totalNumberOfBuckets = prAdvisor.getTotalNumBuckets(); |
| |
| final PartitionResolver resolver = getResolver(region, key, callbackArg); |
| Object resolveKey; |
| EntryOperation entryOp = null; |
| if (resolver == null) { |
| // client has not registered PartitionResolver |
| // Assuming even PR at server side is not using PartitionResolver |
| resolveKey = key; |
| } |
| else { |
| entryOp = new EntryOperationImpl(region, operation, key, |
| value, callbackArg); |
| resolveKey = resolver.getRoutingObject(entryOp); |
| if (resolveKey == null) { |
| throw new IllegalStateException( |
| LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL |
| .toLocalizedString()); |
| } |
| } |
| |
| int bucketId; |
| if (resolver instanceof FixedPartitionResolver) { |
| if (entryOp == null) { |
| entryOp = new EntryOperationImpl(region, |
| Operation.FUNCTION_EXECUTION, key, null, null); |
| } |
| String partition = ((FixedPartitionResolver)resolver).getPartitionName( |
| entryOp, prAdvisor.getFixedPartitionNames()); |
| if (partition == null) { |
| Object[] prms = new Object[] { region.getName(), resolver }; |
| throw new IllegalStateException( |
| LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL |
| .toLocalizedString(prms)); |
| } |
| else { |
| bucketId = prAdvisor.assignFixedBucketId(region, partition, resolveKey); |
| } |
| }else { |
| bucketId = PartitionedRegionHelper.getHashKey(resolveKey, totalNumberOfBuckets); |
| } |
| |
| BucketServerLocation66 bsl = (BucketServerLocation66)getPrimaryServerLocation( |
| region, bucketId); |
| if (bsl == null) { |
| return 0; |
| } |
| return bsl.getVersion(); |
| } |
| |
| private ServerLocation getPrimaryServerLocation(Region region, int bucketId) { |
| final String regionFullPath = region.getFullPath(); |
| ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath); |
| if (prAdvisor == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("ClientMetadataService#getServerLocation : Region {} prAdvisor does not exist.", regionFullPath); |
| } |
| return null; |
| } |
| |
| if (prAdvisor.getColocatedWith() != null) { |
| prAdvisor = this.getClientPartitionAdvisor(prAdvisor.getColocatedWith()); |
| if (prAdvisor == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("ClientMetadataService#getServerLocation : Region {} prAdvisor does not exist.", regionFullPath); |
| } |
| return null; |
| } |
| } |
| return prAdvisor.advisePrimaryServerLocation(bucketId); |
| } |
| |
| private void addClientPartitionAdvisor(String regionFullPath, |
| ClientPartitionAdvisor advisor) { |
| if (this.cache.isClosed() || this.clientPRAdvisors == null) { |
| return; |
| } |
| try { |
| this.clientPRAdvisors.put(regionFullPath, advisor); |
| if (advisor.getColocatedWith() != null) { |
| String parentRegionPath = advisor.getColocatedWith(); |
| Set<ClientPartitionAdvisor> colocatedAdvisors = this.colocatedPRAdvisors.get(parentRegionPath); |
| if(colocatedAdvisors == null){ |
| colocatedAdvisors = new CopyOnWriteArraySet<ClientPartitionAdvisor>(); |
| this.colocatedPRAdvisors.put(parentRegionPath, colocatedAdvisors); |
| } |
| colocatedAdvisors.add(advisor); |
| } |
| } |
| catch (Exception npe) { |
| // ignore, shutdown case |
| } |
| |
| } |
| |
| public ClientPartitionAdvisor getClientPartitionAdvisor(String regionFullPath) { |
| if (this.cache.isClosed() || this.clientPRAdvisors == null) { |
| return null; |
| } |
| ClientPartitionAdvisor prAdvisor = null; |
| try { |
| prAdvisor = this.clientPRAdvisors.get(regionFullPath); |
| } |
| catch (Exception npe) { |
| return null; |
| } |
| return prAdvisor; |
| } |
| |
| public Set<ClientPartitionAdvisor> getColocatedClientPartitionAdvisor(String regionFullPath) { |
| if (this.cache.isClosed() || this.clientPRAdvisors == null || this.colocatedPRAdvisors == null) { |
| return null; |
| } |
| return this.colocatedPRAdvisors.get(regionFullPath); |
| } |
| |
| private Set<String> getAllRegionFullPaths() { |
| if (this.cache.isClosed() || this.clientPRAdvisors == null) { |
| return null; |
| } |
| Set<String> keys = null; |
| try { |
| keys = this.clientPRAdvisors.keySet(); |
| } |
| catch (Exception npe) { |
| return null; |
| } |
| return keys; |
| } |
| |
| public void close() { |
| this.clientPRAdvisors.clear(); |
| this.colocatedPRAdvisors.clear(); |
| } |
| |
| public boolean isRefreshMetadataTestOnly() { |
| return isMetadataRefreshed_TEST_ONLY; |
| } |
| |
| public void satisfyRefreshMetadata_TEST_ONLY(boolean isRefreshMetadataTestOnly) { |
| isMetadataRefreshed_TEST_ONLY = isRefreshMetadataTestOnly; |
| } |
| |
| public Map<String, ClientPartitionAdvisor> getClientPRMetadata_TEST_ONLY() { |
| return clientPRAdvisors; |
| } |
| |
| public Map<String, ClientPartitionAdvisor> getClientPartitionAttributesMap() { |
| return clientPRAdvisors; |
| } |
| |
| public boolean honourServerGroup(){ |
| return HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP; |
| } |
| |
| public boolean isMetadataStable() { |
| return isMetadataStable; |
| } |
| |
| public void setMetadataStable(boolean isMetadataStable) { |
| this.isMetadataStable = isMetadataStable; |
| } |
| |
| private boolean isMetadataRefreshed_TEST_ONLY = false; |
| |
| } |