| /*========================================================================= |
| * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.cache.partitioned.rebalance; |
| |
| import java.util.Collection; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.SortedSet; |
| import java.util.TreeSet; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import com.gemstone.gemfire.cache.partition.PartitionMemberInfo; |
| import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; |
| import com.gemstone.gemfire.internal.Assert; |
| import com.gemstone.gemfire.internal.cache.FixedPartitionAttributesImpl; |
| import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegion; |
| import com.gemstone.gemfire.internal.cache.partitioned.InternalPartitionDetails; |
| import com.gemstone.gemfire.internal.cache.partitioned.OfflineMemberDetails; |
| import com.gemstone.gemfire.internal.cache.partitioned.PRLoad; |
| import com.gemstone.gemfire.internal.cache.partitioned.PartitionMemberInfoImpl; |
| import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberID; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.logging.LogService; |
| import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; |
| |
| /** |
| * A model of the load on all of the members for a partitioned region. This |
| * model is used to find the best members to create buckets on or move buckets |
| * or primaries too. All of the actual work of creating a copy, moving a |
| * primary, etc. Is performed by the BucketOperator that is passed to the |
| * constructor. |
| * |
| * To use, create a model and populate it using the addMember method. addMember |
| * takes a region argument, to indicate which region the data is for. All of the |
| * regions added to a single model are assumed to be colocated, and the model |
| * adds together the load from each of the individual regions to balance all of |
| * the regions together. |
| * |
| * Reblancing operations are performed by repeatedly calling model.nextStep |
| * until it returns false. Each call to nextStep should perform another |
| * operation. The model will make callbacks to the BucketOperator you provide to |
| * the contructor perform the actual create or move. |
| * |
| * While creating redundant copies our moving buckets, this model tries to |
| * minimize the standard deviation in the weighted loads for the members. The |
| * weighted load for the member is the sum of the load for all of the buckets on |
| * the member divided by that members weight. |
| * |
| * This model is not threadsafe. |
| * |
| * @author dsmith |
| * @since 6.0 |
| * |
| */ |
| @SuppressWarnings("synthetic-access") |
| public class PartitionedRegionLoadModel { |
| private static Logger logger = LogService.getLogger(); |
| |
| /** |
| * A comparator that is used to sort buckets in the order |
| * that we should satisfy redundancy - most needy buckets first. |
| */ |
| private static final Comparator<Bucket> REDUNDANCY_COMPARATOR = new Comparator<Bucket>() { |
| public int compare(Bucket o1, Bucket o2) { |
| // put the buckets with the lowest redundancy first |
| int result = o1.getRedundancy() - o2.getRedundancy(); |
| if (result == 0) { |
| // put the bucket with the largest load first. This should give us a |
| // better chance of finding a place to put it |
| result = Float.compare(o2.getLoad(), o1.getLoad()); |
| } |
| if (result == 0) { |
| // finally, just use the id so the comparator doesn't swallow buckets |
| // with the same load |
| result = o1.getId() - o2.getId(); |
| } |
| |
| return result; |
| } |
| }; |
| |
| private static final long MEGABYTES = 1024 * 1024; |
| |
| /** |
| * A member to represent inconsistent data. For example, if two members think |
| * they are the primary for a bucket, we will set the primary to invalid, so it won't |
| * be a candidate for rebalancing. |
| */ |
| final MemberRollup INVALID_MEMBER = new MemberRollup(null, false, false); |
| |
| private final BucketRollup[] buckets; |
| |
| /** |
| * A map of all members that host this partitioned region |
| */ |
| private final Map<InternalDistributedMember, MemberRollup> members = new HashMap<InternalDistributedMember, MemberRollup>(); |
| |
| /** |
| * The set of all regions that are colocated in this model. |
| */ |
| private final Set<String> allColocatedRegions= new HashSet<String>(); |
| |
| /** |
| * The list of buckets that have low redundancy |
| */ |
| private SortedSet<BucketRollup> lowRedundancyBuckets= null; |
| private SortedSet<BucketRollup> overRedundancyBuckets= null; |
| private final Collection<Move> attemptedPrimaryMoves = new HashSet<Move>(); |
| private final Collection<Move> attemptedBucketMoves = new HashSet<Move>(); |
| private final Collection<Move> attemptedBucketCreations = new HashSet<Move>(); |
| private final Collection<Move> attemptedBucketRemoves = new HashSet<Move>(); |
| |
| private final BucketOperator operator; |
| private final int requiredRedundancy; |
| |
| /**The average primary load on a member */ |
| private float primaryAverage = -1; |
| /**The average bucket load on a member */ |
| private float averageLoad = -1; |
| /** |
| * The minimum improvement in variance that we'll consider worth moving a |
| * primary |
| */ |
| private double minPrimaryImprovement = -1; |
| /** |
| * The minimum improvement in variance that we'll consider worth moving a |
| * bucket |
| */ |
| private double minImprovement = -1; |
| |
| private final AddressComparor addressComparor; |
| |
| private final Set<InternalDistributedMember> criticalMembers; |
| |
| private final PartitionedRegion partitionedRegion; |
| |
| |
| /** |
| * Create a new model |
| * @param operator the operator which performs the actual creates/moves for buckets |
| * @param redundancyLevel The expected redundancy level for the region |
| * @param enforceLocalMaxMemory |
| * @param criticalMembers |
| */ |
| public PartitionedRegionLoadModel(BucketOperator operator, |
| int redundancyLevel, int numBuckets, AddressComparor addressComparor, |
| Set<InternalDistributedMember> criticalMembers, PartitionedRegion region) { |
| this.operator = operator; |
| this.requiredRedundancy = redundancyLevel; |
| this.buckets = new BucketRollup[numBuckets]; |
| this.addressComparor = addressComparor; |
| this.criticalMembers = criticalMembers; |
| this.partitionedRegion = region; |
| } |
| |
| /** |
| * Add a region to the model. All regions that are added are assumed to be colocated. |
| * The first region added to the model should be the parent region. The parent region |
| * is expected to have at least as many members as child regions; it may have more. If |
| * the parent has more members than child regions those members will be considered invalid. |
| * @param region |
| * @param memberDetailSet A set of details about each member. |
| * @param offlineDetails |
| */ |
| public void addRegion(String region, |
| Collection<? extends InternalPartitionDetails> memberDetailSet, |
| OfflineMemberDetails offlineDetails, |
| boolean enforceLocalMaxMemory) { |
| this.allColocatedRegions.add(region); |
| //build up a list of members and an array of buckets for this |
| //region. Each bucket has a reference to all of the members |
| //that host it and each member has a reference to all of the buckets |
| //it hosts |
| Map<InternalDistributedMember, Member> regionMember = new HashMap<InternalDistributedMember, Member>(); |
| Bucket[] regionBuckets = new Bucket[this.buckets.length]; |
| for(InternalPartitionDetails memberDetails : memberDetailSet) { |
| InternalDistributedMember memberId = (InternalDistributedMember) memberDetails.getDistributedMember(); |
| |
| boolean isCritical = criticalMembers.contains(memberId); |
| Member member = new Member(memberId, |
| memberDetails.getPRLoad().getWeight(), memberDetails.getConfiguredMaxMemory(), isCritical, enforceLocalMaxMemory); |
| regionMember.put(memberId, member); |
| |
| PRLoad load = memberDetails.getPRLoad(); |
| for(int i = 0; i < regionBuckets.length; i++) { |
| if(load.getReadLoad(i) > 0) { |
| Bucket bucket = regionBuckets[i]; |
| if(bucket == null) { |
| Set<PersistentMemberID> offlineMembers = offlineDetails.getOfflineMembers(i); |
| bucket = new Bucket(i, load.getReadLoad(i), memberDetails.getBucketSize(i), offlineMembers); |
| regionBuckets[i] = bucket; |
| } |
| bucket.addMember(member); |
| if(load.getWriteLoad(i) > 0) { |
| if(bucket.getPrimary() == null) { |
| bucket.setPrimary(member, load.getWriteLoad(i)); |
| } else if(!bucket.getPrimary().equals(member)) { |
| bucket.setPrimary(INVALID_MEMBER, 1); |
| } |
| } |
| } |
| } |
| } |
| |
| //add each member for this region to a rollup of all colocated |
| //regions |
| for(Member member: regionMember.values()) { |
| InternalDistributedMember memberId = member.getDistributedMember(); |
| MemberRollup memberSum = this.members.get(memberId); |
| boolean isCritical = criticalMembers.contains(memberId); |
| if(memberSum == null) { |
| memberSum = new MemberRollup(memberId, isCritical, enforceLocalMaxMemory); |
| this.members.put(memberId, memberSum); |
| } |
| |
| memberSum.addColocatedMember(region, member); |
| } |
| |
| //Now, add the region to the rollups of the colocated |
| //regions and buckets |
| for(int i =0; i < this.buckets.length; i++) { |
| if(regionBuckets[i] == null) { |
| //do nothing, this bucket is not hosted for this region. |
| // [sumedh] remove from buckets array too to be consistent since |
| // this method will be invoked repeatedly for all colocated regions, |
| // and then we may miss some colocated regions for a bucket leading |
| // to all kinds of issues later (e.g. see SQLF test for #41472 that |
| // shows some problems including NPEs, hangs etc.) |
| this.buckets[i] = null; |
| continue; |
| } |
| if(this.buckets[i]==null) { |
| //If this is the first region we have seen that is hosting this bucket, create a bucket rollup |
| this.buckets[i] = new BucketRollup(i); |
| } |
| |
| //Add all of the members hosting the bucket to the rollup |
| for(Member member: regionBuckets[i].getMembersHosting()) { |
| InternalDistributedMember memberId = member.getDistributedMember(); |
| this.buckets[i].addMember(this.members.get(memberId)); |
| } |
| |
| //set the primary for the rollup |
| if(regionBuckets[i].getPrimary() != null) { |
| if(this.buckets[i].getPrimary() == null) { |
| InternalDistributedMember memberId = regionBuckets[i].getPrimary().getDistributedMember(); |
| this.buckets[i].setPrimary(this.members.get(memberId), 0); |
| } |
| else{ |
| if(!(this.buckets[i].getPrimary() == INVALID_MEMBER)){ |
| if (!this.buckets[i].getPrimary().getDistributedMember().equals( |
| regionBuckets[i].getPrimary().getDistributedMember())) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("PartitionedRegionLoadModel - Setting bucket {} to INVALID because it is the primary on two members.This could just be a race in the collocation of data. member1={} member2={}", |
| this.buckets[i], this.buckets[i].getPrimary(), regionBuckets[i].getPrimary()); |
| } |
| this.buckets[i].setPrimary(INVALID_MEMBER, 0); |
| } |
| } |
| } |
| } |
| this.buckets[i].addColocatedBucket(region, regionBuckets[i]); |
| } |
| |
| //TODO rebalance - there is a possibility of adding members |
| //back here, which I don't like. I think maybe all of the regions should be in the |
| //constructor for the load model, and then when the constructor is done |
| //we can do with validation. |
| //If any members don't have this new region, remove them. |
| for(Iterator<Entry<InternalDistributedMember, MemberRollup>> itr = members.entrySet().iterator();itr.hasNext();) { |
| MemberRollup memberRollup = itr.next().getValue(); |
| if(!memberRollup.getColocatedMembers().keySet().equals(this.allColocatedRegions)) { |
| itr.remove(); |
| if(logger.isDebugEnabled()) { |
| logger.debug("PartitionedRegionLoadModel - removing member {} from the consideration because it doesn't have all of the colocated regions. Expected={}, was={}", |
| memberRollup, allColocatedRegions, memberRollup.getColocatedMembers()); |
| } |
| //This state should never happen |
| if(!memberRollup.getBuckets().isEmpty()) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.PartitionedRegionLoadModel_INCOMPLETE_COLOCATION, |
| new Object[] { memberRollup, this.allColocatedRegions, memberRollup.getColocatedMembers().keySet(), memberRollup.getBuckets() })); |
| } |
| for(Bucket bucket: new HashSet<Bucket>(memberRollup.getBuckets())) { |
| bucket.removeMember(memberRollup); |
| } |
| } |
| } |
| } |
| |
| public void initialize() { |
| resetAverages(); |
| initOverRedundancyBuckets(); |
| initLowRedundancyBuckets(); |
| } |
| |
| public SortedSet<BucketRollup> getLowRedundancyBuckets() { |
| return lowRedundancyBuckets; |
| } |
| |
| public SortedSet<BucketRollup> getOverRedundancyBuckets() { |
| return overRedundancyBuckets; |
| } |
| |
| public void setOverRedundancyBuckets( |
| SortedSet<BucketRollup> overRedundancyBuckets) { |
| this.overRedundancyBuckets = overRedundancyBuckets; |
| } |
| |
| public boolean enforceUniqueZones() { |
| return addressComparor.enforceUniqueZones(); |
| } |
| |
| public void ignoreLowRedundancyBucket(BucketRollup first) { |
| this.lowRedundancyBuckets.remove(first); |
| } |
| |
| public void ignoreOverRedundancyBucket(BucketRollup first) { |
| this.overRedundancyBuckets.remove(first); |
| } |
| |
| public MemberRollup getMember(InternalDistributedMember target) { |
| return members.get(target); |
| } |
| |
| public BucketRollup[] getBuckets() { |
| return buckets; |
| } |
| |
| public String getName() { |
| return getPartitionedRegion().getFullPath(); |
| } |
| |
| public PartitionedRegion getPartitionedRegion() { |
| //TODO - this model really should not have |
| //a reference to the partitioned region object. |
| //The fixed PR code currently depends on this |
| //partitioned region object and needs |
| //refactoring. |
| return partitionedRegion; |
| } |
| |
| private Map<String, Long> getColocatedRegionSizes(BucketRollup bucket) { |
| Map<String, Long> colocatedRegionSizes = new HashMap<String, Long>(); |
| |
| for (Map.Entry<String, Bucket> entry : bucket.getColocatedBuckets() |
| .entrySet()) { |
| colocatedRegionSizes.put(entry.getKey(), |
| Long.valueOf(entry.getValue().getBytes())); |
| } |
| return colocatedRegionSizes; |
| } |
| |
| public void createRedundantBucket(BucketRollup bucket, |
| Member targetMember) { |
| Map<String, Long> colocatedRegionSizes = getColocatedRegionSizes(bucket); |
| Move move = new Move(null, targetMember, bucket); |
| |
| if(!this.operator.createRedundantBucket(targetMember.getMemberId(), bucket.getId(), colocatedRegionSizes)) { |
| this.attemptedBucketCreations.add(move); |
| } else { |
| this.lowRedundancyBuckets.remove(bucket); |
| bucket.addMember(targetMember); |
| //put the bucket back into the list if we still need to satisfy redundancy for |
| //this bucket |
| if(bucket.getRedundancy() < this.requiredRedundancy) { |
| this.lowRedundancyBuckets.add(bucket); |
| } |
| resetAverages(); |
| } |
| } |
| |
| |
| |
| protected void remoteOverRedundancyBucket(BucketRollup bucket, |
| Member targetMember) { |
| |
| Move bestMove = new Move(null, targetMember, bucket); |
| Map<String, Long> colocatedRegionSizes = getColocatedRegionSizes(bucket); |
| |
| if(!this.operator.removeBucket(targetMember.getMemberId(), bucket.getId(), colocatedRegionSizes)) { |
| this.attemptedBucketRemoves.add(bestMove); |
| } else { |
| this.overRedundancyBuckets.remove(bucket); |
| bucket.removeMember(targetMember); |
| //put the bucket back into the list if we still need to satisfy redundancy for |
| //this bucket |
| if(bucket.getOnlineRedundancy() > this.requiredRedundancy) { |
| this.overRedundancyBuckets.add(bucket); |
| } |
| resetAverages(); |
| } |
| } |
| |
| private void initLowRedundancyBuckets() { |
| this.lowRedundancyBuckets = new TreeSet<BucketRollup>(REDUNDANCY_COMPARATOR); |
| for(BucketRollup b: this.buckets) { |
| if(b != null && b.getRedundancy() >= 0 && b.getRedundancy() < this.requiredRedundancy) { |
| this.lowRedundancyBuckets.add(b); |
| } |
| } |
| } |
| |
| private void initOverRedundancyBuckets() { |
| this.overRedundancyBuckets = new TreeSet<BucketRollup>(REDUNDANCY_COMPARATOR); |
| for(BucketRollup b: this.buckets) { |
| if(b != null && b.getOnlineRedundancy() > this.requiredRedundancy) { |
| this.overRedundancyBuckets.add(b); |
| } |
| } |
| } |
| |
| /** |
| * Find the best member to put a new bucket on. |
| * |
| * @param bucket |
| * the bucket we want to create |
| * @param checkIPAddress |
| * true if we should only consider members that do not have the same |
| * IP Address as a member that already hosts the bucket |
| */ |
| public Move findBestTarget(Bucket bucket, boolean checkIPAddress) { |
| float leastCost = Float.MAX_VALUE; |
| Move bestMove = null; |
| |
| for (Member member : this.members.values()) { |
| if (member.willAcceptBucket(bucket, null, checkIPAddress).willAccept()) { |
| float cost = (member.getTotalLoad() + bucket.getLoad()) |
| / member.getWeight(); |
| if (cost < leastCost) { |
| Move move = new Move(null, member, bucket); |
| if (!this.attemptedBucketCreations.contains(move)) { |
| leastCost = cost; |
| bestMove = move; |
| } |
| } |
| } |
| } |
| return bestMove; |
| } |
| |
| /** |
| * Find the best member to remove a bucket from |
| * |
| * @param bucket |
| * the bucket we want to create |
| */ |
| public Move findBestRemove(Bucket bucket) { |
| float mostLoaded = Float.MIN_VALUE; |
| Move bestMove = null; |
| |
| for (Member member : bucket.getMembersHosting()) { |
| float newLoad = (member.getTotalLoad() - bucket.getLoad()) |
| / member.getWeight(); |
| if (newLoad > mostLoaded && ! member.equals(bucket.getPrimary())) { |
| Move move = new Move(null, member, bucket); |
| if (!this.attemptedBucketRemoves.contains(move)) { |
| mostLoaded = newLoad; |
| bestMove = move; |
| } |
| } |
| } |
| return bestMove; |
| } |
| |
| public Move findBestTargetForFPR(Bucket bucket, boolean checkIPAddress) { |
| Move noMove = null; |
| InternalDistributedMember targetMemberID = null; |
| Member targetMember = null; |
| List<FixedPartitionAttributesImpl> fpas = this.partitionedRegion |
| .getFixedPartitionAttributesImpl(); |
| |
| if (fpas != null) { |
| for (FixedPartitionAttributesImpl fpaImpl : fpas) { |
| if (fpaImpl.hasBucket(bucket.getId())) { |
| targetMemberID = this.partitionedRegion.getDistributionManager() |
| .getDistributionManagerId(); |
| if (this.members.containsKey(targetMemberID)) { |
| targetMember = this.members.get(targetMemberID); |
| if (targetMember.willAcceptBucket(bucket, null, checkIPAddress).willAccept()) { |
| // We should have just one move for creating |
| // all the buckets for a FPR on this node. |
| return new Move(null, targetMember, bucket); |
| } |
| } |
| } |
| } |
| } |
| |
| return noMove; |
| } |
| |
| protected boolean movePrimary(Move bestMove) { |
| Member bestSource = bestMove.getSource(); |
| Member bestTarget = bestMove.getTarget(); |
| Bucket bestBucket = bestMove.getBucket(); |
| boolean successfulMove = this.operator.movePrimary(bestSource.getDistributedMember(), bestTarget |
| .getDistributedMember(), bestBucket.getId()); |
| |
| if(successfulMove) { |
| bestBucket.setPrimary(bestTarget, bestBucket.getPrimaryLoad()); |
| } |
| |
| boolean entryAdded = this.attemptedPrimaryMoves.add(bestMove); |
| Assert |
| .assertTrue(entryAdded, |
| "PartitionedRegionLoadModel.movePrimarys - excluded set is not growing, so we probably would have an infinite loop here"); |
| |
| return successfulMove; |
| } |
| |
| public Move findBestPrimaryMove() { |
| Move bestMove= null; |
| double bestImprovement = 0; |
| GemFireCacheImpl cache = null; |
| for(Member source: this.members.values()) { |
| for(Bucket bucket: source.getPrimaryBuckets()) { |
| for(Member target: bucket.getMembersHosting()) { |
| if(source.equals(target)) { |
| continue; |
| } |
| // If node is not fully initialized yet, then skip this node |
| // (SQLFabric DDL replay in progress). |
| if (cache == null) { |
| cache = GemFireCacheImpl.getInstance(); |
| } |
| if (cache != null |
| && cache.isUnInitializedMember(target.getMemberId())) { |
| continue; |
| } |
| double improvement = improvement(source.getPrimaryLoad(), source |
| .getWeight(), target.getPrimaryLoad(), target.getWeight(), bucket.getPrimaryLoad(), |
| getPrimaryAverage()); |
| if (improvement > bestImprovement && improvement > getMinPrimaryImprovement()) { |
| Move move = new Move(source, target, bucket); |
| if (!this.attemptedPrimaryMoves.contains(move)) { |
| bestImprovement = improvement; |
| bestMove = move; |
| } |
| } |
| } |
| } |
| } |
| return bestMove; |
| } |
| |
| /** |
| * Move all primary from other to this |
| */ |
| private void makeFPRPrimaryForThisNode() { |
| List<FixedPartitionAttributesImpl> FPAs = this.partitionedRegion |
| .getFixedPartitionAttributesImpl(); |
| InternalDistributedMember targetId = this.partitionedRegion |
| .getDistributionManager().getId(); |
| Member target = this.members.get(targetId); |
| Set<Bucket> unsuccessfulAttempts = new HashSet<Bucket>(); |
| for (Bucket bucket : this.buckets) { |
| if (bucket != null) { |
| for (FixedPartitionAttributesImpl fpa : FPAs) { |
| if (fpa.hasBucket(bucket.id) && fpa.isPrimary()) { |
| Member source = bucket.primary; |
| bucket.getPrimary(); |
| if (source != target) { |
| // HACK: In case we don't know who is Primary at this time |
| // we just set source as target too for stat purposes |
| |
| InternalDistributedMember srcDM = (source == null || source == INVALID_MEMBER) ? target |
| .getDistributedMember() : source.getDistributedMember(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("PRLM#movePrimariesForFPR: For Bucket#{}, moving primary from source {} to target {}", |
| bucket.getId(), bucket.primary, target); |
| } |
| boolean successfulMove = this.operator.movePrimary(srcDM, |
| target.getDistributedMember(), bucket.getId()); |
| unsuccessfulAttempts.add(bucket); |
| // We have to move the primary otherwise there is some problem! |
| Assert.assertTrue(successfulMove, |
| " Fixed partitioned region not able to move the primary!"); |
| if (successfulMove) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("PRLM#movePrimariesForFPR: For Bucket#{}, moving primary source {} to target {}", |
| bucket.getId(), source, target); |
| } |
| |
| bucket.setPrimary(target, bucket.getPrimaryLoad()); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Calculate the target weighted number of primaries on each node. |
| */ |
| private float getPrimaryAverage() { |
| if(this.primaryAverage == -1) { |
| float totalWeight = 0; |
| float totalPrimaryCount = 0; |
| for(Member member : this.members.values()) { |
| totalPrimaryCount += member.getPrimaryLoad(); |
| totalWeight += member.getWeight(); |
| } |
| |
| this.primaryAverage = totalPrimaryCount / totalWeight; |
| } |
| |
| return this.primaryAverage; |
| } |
| |
| /** |
| * Calculate the target weighted amount of data on each node. |
| */ |
| private float getAverageLoad() { |
| if(this.averageLoad == -1) { |
| float totalWeight = 0; |
| float totalLoad = 0; |
| for(Member member : this.members.values()) { |
| totalLoad += member.getTotalLoad(); |
| totalWeight += member.getWeight(); |
| } |
| |
| this.averageLoad = totalLoad / totalWeight; |
| } |
| |
| return this.averageLoad; |
| } |
| |
| /** |
| * Calculate the minimum improvement in variance that will we consider worth |
| * while. Currently this is calculated as the improvement in variance that |
| * would occur by removing the smallest bucket from the member with the |
| * largest weight. |
| */ |
| private double getMinPrimaryImprovement() { |
| if((this.minPrimaryImprovement + 1.0) < .0000001) { // i.e. == -1 |
| float largestWeight = 0; |
| float smallestBucket = 0; |
| for(Member member : this.members.values()) { |
| if(member.getWeight() > largestWeight) { |
| largestWeight = member.getWeight(); |
| } |
| for(Bucket bucket: member.getPrimaryBuckets()) { |
| if(bucket.getPrimaryLoad() < smallestBucket || smallestBucket == 0) { |
| smallestBucket = bucket.getPrimaryLoad(); |
| } |
| } |
| } |
| double before = variance(getPrimaryAverage() * largestWeight |
| + smallestBucket, largestWeight, getPrimaryAverage()); |
| double after = variance(getPrimaryAverage() * largestWeight, |
| largestWeight, getPrimaryAverage()); |
| this.minPrimaryImprovement = (before - after) / smallestBucket; |
| } |
| return this.minPrimaryImprovement; |
| } |
| |
| /** |
| * Calculate the minimum improvement in variance that will we consider worth |
| * while. Currently this is calculated as the improvement in variance that |
| * would occur by removing the smallest bucket from the member with the |
| * largest weight. |
| */ |
| private double getMinImprovement() { |
| if((this.minImprovement + 1.0) < .0000001) { // i.e. == -1 |
| float largestWeight = 0; |
| float smallestBucket = 0; |
| for(Member member : this.members.values()) { |
| if(member.getWeight() > largestWeight) { |
| largestWeight = member.getWeight(); |
| } |
| //find the smallest bucket, ignoring empty buckets. |
| for(Bucket bucket: member.getBuckets()) { |
| if(smallestBucket == 0 || (bucket.getLoad() < smallestBucket && bucket.getBytes() > 0) ) { |
| smallestBucket = bucket.getLoad(); |
| } |
| } |
| } |
| double before = variance(getAverageLoad() * largestWeight |
| + smallestBucket, largestWeight, getAverageLoad()); |
| double after = variance(getAverageLoad() * largestWeight, |
| largestWeight, getAverageLoad()); |
| this.minImprovement = (before - after) / smallestBucket; |
| } |
| return this.minImprovement; |
| } |
| |
| private void resetAverages() { |
| this.primaryAverage = -1; |
| this.averageLoad = -1; |
| this.minPrimaryImprovement = -1; |
| this.minImprovement = -1; |
| } |
| |
| /** |
| * Calculate how much the variance in load will decrease for a given move. |
| * |
| * @param sLoad |
| * the current load on the source member |
| * @param sWeight |
| * the weight of the source member |
| * @param tLoad |
| * the current load on the target member |
| * @param tWeight |
| * the weight of the target member |
| * @param bucketSize |
| * the size of the bucket we're considering moving |
| * @param average |
| * the target weighted load for all members. |
| * @return the change in variance that would occur by making this move. |
| * Essentially variance_before - variance_after, so a positive change |
| * is a means the variance is decreasing. |
| */ |
| private double improvement(float sLoad, float sWeight, |
| float tLoad, float tWeight, float bucketSize, float average) { |
| |
| double vSourceBefore = variance(sLoad, sWeight, average); |
| double vSourceAfter = variance(sLoad - bucketSize, sWeight, average); |
| double vTargetBefore = variance(tLoad, tWeight, average); |
| double vTargetAfter = variance(tLoad + bucketSize, tWeight, average); |
| |
| double improvement = vSourceBefore - vSourceAfter + vTargetBefore - vTargetAfter; |
| return improvement / bucketSize; |
| } |
| |
| private double variance(double load, double weight, double average) { |
| double deviation = (load / weight - average) ; |
| return deviation * deviation; |
| } |
| |
| public Move findBestBucketMove() { |
| Move bestMove= null; |
| double bestImprovement = 0; |
| for(Member source: this.members.values()) { |
| for(Bucket bucket: source.getBuckets()) { |
| for(Member target: this.members.values()) { |
| if(bucket.getMembersHosting().contains(target)) { |
| continue; |
| } |
| if(!target.willAcceptBucket(bucket, source, true).willAccept()) { |
| continue; |
| } |
| double improvement = improvement(source.getTotalLoad(), source |
| .getWeight(), target.getTotalLoad(), target.getWeight(), bucket.getLoad(), |
| getAverageLoad()); |
| if (improvement > bestImprovement && improvement > getMinImprovement()) { |
| Move move = new Move(source, target, bucket); |
| if(!this.attemptedBucketMoves.contains(move)) { |
| bestImprovement = improvement; |
| bestMove = move; |
| } |
| } |
| } |
| } |
| } |
| return bestMove; |
| } |
| |
| protected boolean moveBucket(Move bestMove) { |
| Member bestSource = bestMove.getSource(); |
| Member bestTarget = bestMove.getTarget(); |
| BucketRollup bestBucket = (BucketRollup) bestMove.getBucket(); |
| |
| Map<String, Long> colocatedRegionSizes = getColocatedRegionSizes(bestBucket); |
| |
| boolean successfulMove = this.operator.moveBucket(bestSource.getDistributedMember(), bestTarget |
| .getDistributedMember(), bestBucket.getId(), colocatedRegionSizes); |
| |
| if(successfulMove) { |
| bestBucket.addMember(bestTarget); |
| if(bestSource.equals(bestBucket.getPrimary())) { |
| bestBucket.setPrimary(bestTarget, bestBucket.getPrimaryLoad()); |
| } |
| bestBucket.removeMember(bestSource); |
| } |
| |
| boolean entryAdded = this.attemptedBucketMoves.add(bestMove); |
| Assert |
| .assertTrue(entryAdded, |
| "PartitionedRegionLoadModel.moveBuckets - excluded set is not growing, so we probably would have an infinite loop here"); |
| |
| return successfulMove; |
| } |
| |
| /** |
| * Return a snapshot of what the partitioned member details look like. |
| * @return a set of partitioned member details. |
| */ |
| public Set<PartitionMemberInfo> getPartitionedMemberDetails(String region) { |
| TreeSet<PartitionMemberInfo> result = new TreeSet<PartitionMemberInfo>(); |
| for(MemberRollup member: this.members.values()) { |
| Member colocatedMember = member.getColocatedMember(region); |
| if(colocatedMember != null) { |
| result.add(new PartitionMemberInfoImpl(colocatedMember |
| .getDistributedMember(), colocatedMember |
| .getConfiguredMaxMemory(), colocatedMember.getSize(), |
| colocatedMember.getBucketCount(), colocatedMember |
| .getPrimaryCount())); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * For testing only, calculate the total |
| * variance of the members |
| */ |
| public float getVarianceForTest() { |
| float variance = 0; |
| |
| for(Member member: this.members.values()) { |
| variance += variance(member.getTotalLoad(), member.getWeight(), getAverageLoad()); |
| } |
| |
| return variance; |
| } |
| |
| /** |
| * For testing only, calculate the total |
| * variance of the members |
| */ |
| public float getPrimaryVarianceForTest() { |
| float variance = 0; |
| |
| for(Member member: this.members.values()) { |
| variance += variance(member.getPrimaryLoad(), member.getWeight(), getPrimaryAverage()); |
| } |
| |
| return variance; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder result = new StringBuilder(); |
| TreeSet<Bucket> allBucketIds = new TreeSet<Bucket>(new Comparator<Bucket>() { |
| public int compare(Bucket o1, Bucket o2) { |
| return o1.getId() - o2.getId(); |
| } |
| }); |
| if(this.members.isEmpty()) { |
| return ""; |
| } |
| int longestMemberId = 0; |
| for(Member member: this.members.values()) { |
| allBucketIds.addAll(member.getBuckets()); |
| int memberIdLength = member.getDistributedMember().toString().length(); |
| if(longestMemberId < memberIdLength) { |
| longestMemberId = memberIdLength; |
| } |
| } |
| result.append(String.format("%" + longestMemberId + "s primaries size(MB) max(MB)", "MemberId")); |
| for(Bucket bucket: allBucketIds) { |
| result.append(String.format("%4s", bucket.getId())); |
| } |
| for(Member member: this.members.values()) { |
| result.append(String.format("\n%" + longestMemberId |
| + "s %9.0f %8.2f %8.2f", member |
| .getDistributedMember(), member |
| .getPrimaryLoad(), member.getSize() / (float)MEGABYTES, member |
| .getConfiguredMaxMemory() / (float)MEGABYTES)); |
| for(Bucket bucket: allBucketIds) { |
| char symbol; |
| if(member.getPrimaryBuckets().contains(bucket)) { |
| symbol = 'P'; |
| } else if(member.getBuckets().contains(bucket)){ |
| symbol = 'R'; |
| } else { |
| symbol = 'X'; |
| } |
| result.append(" ").append(symbol); |
| } |
| } |
| |
| result.append(String.format("\n%" + longestMemberId |
| + "s ", "#offline", 0, 0, 0)); |
| for(Bucket bucket: allBucketIds) { |
| result.append(String.format("%4s", bucket.getOfflineMembers().size())); |
| } |
| |
| return result.toString(); |
| } |
| |
| /** |
| * Represents the sum of all of the colocated regions on a given |
| * member. Also, holds a map of all of the colocated regions |
| * hosted on this member. |
| */ |
| private class MemberRollup extends Member { |
| private final Map<String, Member> colocatedMembers = new HashMap<String, Member>(); |
| private final boolean invalid = false; |
| |
| public MemberRollup(InternalDistributedMember memberId, boolean isCritical, boolean enforceLocalMaxMemory) { |
| super(memberId, isCritical, enforceLocalMaxMemory); |
| } |
| |
| /** |
| * Indicates that this member doesn't have all of the colocated regions |
| */ |
| public boolean isInvalid() { |
| return invalid; |
| } |
| |
| public boolean addColocatedMember(String region, Member member) { |
| if(!getColocatedMembers().containsKey(region)) { |
| this.getColocatedMembers().put(region, member); |
| this.weight += member.weight; |
| this.localMaxMemory += member.localMaxMemory; |
| return true; |
| } |
| return false; |
| } |
| |
| |
| public Member getColocatedMember(String region) { |
| return getColocatedMembers().get(region); |
| } |
| |
| /** |
| * Update the load on this member rollup with a change |
| * in size of one of the bucket rollups hosted by this member |
| */ |
| public void updateLoad(float load, float primaryLoad, float bytes) { |
| this.totalLoad += load; |
| this.totalPrimaryLoad+= primaryLoad; |
| this.totalBytes += bytes; |
| } |
| |
| @Override |
| public boolean addBucket(Bucket bucket) { |
| if(super.addBucket(bucket)) { |
| BucketRollup bucketRollup = (BucketRollup) bucket; |
| for(Map.Entry<String, Member> entry: getColocatedMembers().entrySet()) { |
| String region = entry.getKey(); |
| Member member = entry.getValue(); |
| Bucket colocatedBucket = bucketRollup.getColocatedBuckets().get(region); |
| if(colocatedBucket != null) { |
| member.addBucket(colocatedBucket); |
| } |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean removeBucket(Bucket bucket) { |
| if(super.removeBucket(bucket)) { |
| BucketRollup bucketRollup = (BucketRollup) bucket; |
| for(Map.Entry<String, Member> entry: getColocatedMembers().entrySet()) { |
| String region = entry.getKey(); |
| Member member = entry.getValue(); |
| Bucket colocatedBucket = bucketRollup.getColocatedBuckets().get(region); |
| if(colocatedBucket != null) { |
| member.removeBucket(colocatedBucket); |
| } |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean addPrimary(Bucket bucket) { |
| if(super.addPrimary(bucket)) { |
| BucketRollup bucketRollup = (BucketRollup) bucket; |
| for(Map.Entry<String, Member> entry: getColocatedMembers().entrySet()) { |
| String region = entry.getKey(); |
| Member member = entry.getValue(); |
| Bucket colocatedBucket = bucketRollup.getColocatedBuckets().get(region); |
| if(colocatedBucket != null) { |
| member.addPrimary(colocatedBucket); |
| } |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean removePrimary(Bucket bucket) { |
| if(super.removePrimary(bucket)) { |
| BucketRollup bucketRollup = (BucketRollup) bucket; |
| for(Map.Entry<String, Member> entry: getColocatedMembers().entrySet()) { |
| String region = entry.getKey(); |
| Member member = entry.getValue(); |
| Bucket colocatedBucket = bucketRollup.getColocatedBuckets().get(region); |
| if(colocatedBucket != null) { |
| member.removePrimary(colocatedBucket); |
| } |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public RefusalReason willAcceptBucket(Bucket bucket, Member source, boolean checkIPAddress) { |
| RefusalReason reason = super.willAcceptBucket(bucket, source, checkIPAddress); |
| if(reason.willAccept()) { |
| BucketRollup bucketRollup = (BucketRollup) bucket; |
| MemberRollup sourceRollup = (MemberRollup) source; |
| for(Map.Entry<String, Member> entry: getColocatedMembers().entrySet()) { |
| String region = entry.getKey(); |
| Member member = entry.getValue(); |
| Bucket colocatedBucket = bucketRollup.getColocatedBuckets().get(region); |
| Member colocatedSource = sourceRollup == null ? null |
| : sourceRollup.getColocatedMembers().get(region); |
| if(colocatedBucket != null) { |
| reason = member.willAcceptBucket(colocatedBucket, colocatedSource, checkIPAddress); |
| if(!reason.willAccept()) { |
| return reason; |
| } |
| } |
| } |
| return RefusalReason.NONE; |
| } |
| return reason; |
| } |
| |
| Map<String, Member> getColocatedMembers() { |
| return this.colocatedMembers; |
| } |
| |
| |
| } |
| |
| /** |
| * Represents the sum of all of colocated buckets with |
| * a given bucket id. |
| * @author dsmith |
| * |
| */ |
| protected class BucketRollup extends Bucket { |
| private final Map<String, Bucket> colocatedBuckets = new HashMap<String, Bucket>(); |
| |
| public BucketRollup(int id) { |
| super(id); |
| } |
| |
| /** |
| * @param region |
| * @param b |
| */ |
| public boolean addColocatedBucket(String region, Bucket b) { |
| if(!this.getColocatedBuckets().containsKey(region)) { |
| this.getColocatedBuckets().put(region, b); |
| this.load += b.getLoad(); |
| this.primaryLoad += b.getPrimaryLoad(); |
| this.bytes += b.getBytes(); |
| this.offlineMembers.addAll(b.getOfflineMembers()); |
| |
| //Update the load on the members hosting this bucket |
| //to reflect the fact that the bucket is larger now. |
| for(Member member: getMembersHosting()) { |
| MemberRollup rollup = (MemberRollup) member; |
| float primaryLoad = 0; |
| if(this.getPrimary() == member) { |
| primaryLoad = b.getPrimaryLoad(); |
| } |
| rollup.updateLoad(b.getLoad(), primaryLoad, b.getBytes()); |
| |
| } |
| return true; |
| } |
| |
| return false; |
| } |
| |
| @Override |
| public boolean addMember(Member targetMember) { |
| if(super.addMember(targetMember)) { |
| MemberRollup memberRollup = (MemberRollup) targetMember; |
| for(Map.Entry<String, Bucket> entry: getColocatedBuckets().entrySet()) { |
| String region = entry.getKey(); |
| Bucket bucket = entry.getValue(); |
| Member member = memberRollup.getColocatedMembers().get(region); |
| if(member != null) { |
| bucket.addMember(member); |
| } |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean removeMember(Member targetMember) { |
| if(super.removeMember(targetMember)) { |
| MemberRollup memberRollup = (MemberRollup) targetMember; |
| for(Map.Entry<String, Bucket> entry: getColocatedBuckets().entrySet()) { |
| String region = entry.getKey(); |
| Bucket bucket = entry.getValue(); |
| Member member = memberRollup.getColocatedMembers().get(region); |
| if(member != null) { |
| bucket.removeMember(member); |
| } |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public void setPrimary(Member targetMember, float primaryLoad) { |
| super.setPrimary(targetMember, primaryLoad); |
| if(targetMember != null) { |
| MemberRollup memberRollup = (MemberRollup) targetMember; |
| for(Map.Entry<String, Bucket> entry: getColocatedBuckets().entrySet()) { |
| String region = entry.getKey(); |
| Bucket bucket = entry.getValue(); |
| Member member = memberRollup.getColocatedMembers().get(region); |
| if(member != null) { |
| bucket.setPrimary(member, primaryLoad); |
| } |
| } |
| } |
| } |
| |
| Map<String, Bucket> getColocatedBuckets() { |
| return this.colocatedBuckets; |
| } |
| } |
| |
| /** |
| * Represents a single member of the distributed system. |
| */ |
| protected class Member implements Comparable<Member> { |
| private final InternalDistributedMember memberId; |
| protected float weight; |
| protected float totalLoad; |
| protected float totalPrimaryLoad; |
| protected long totalBytes; |
| protected long localMaxMemory; |
| private final Set<Bucket> buckets = new TreeSet<Bucket>(); |
| private final Set<Bucket> primaryBuckets = new TreeSet<Bucket>(); |
| private final boolean isCritical; |
| private final boolean enforceLocalMaxMemory; |
| |
| public Member(InternalDistributedMember memberId, boolean isCritical, boolean enforceLocalMaxMemory) { |
| this.memberId = memberId; |
| this.isCritical = isCritical; |
| this.enforceLocalMaxMemory = enforceLocalMaxMemory; |
| } |
| |
| public Member(InternalDistributedMember memberId, float weight, long localMaxMemory, boolean isCritical, boolean enforceLocalMaxMemory) { |
| this(memberId, isCritical, enforceLocalMaxMemory); |
| this.weight = weight; |
| this.localMaxMemory = localMaxMemory; |
| } |
| |
| /** |
| * @param bucket |
| * @param sourceMember |
| * the member we will be moving this bucket off of |
| * @param checkZone true if we should not put two copies |
| * of a bucket on two nodes with the same IP address. |
| */ |
| public RefusalReason willAcceptBucket(Bucket bucket, Member sourceMember, boolean checkZone) { |
| //make sure this member is not already hosting this bucket |
| if(getBuckets().contains(bucket)) { |
| return RefusalReason.ALREADY_HOSTING; |
| } |
| // If node is not fully initialized yet, then skip this node (SQLFabric |
| // DDL replay in progress). |
| final GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); |
| if (cache != null && cache.isUnInitializedMember(getMemberId())) { |
| return RefusalReason.UNITIALIZED_MEMBER; |
| } |
| //Check the ip address |
| if(checkZone) { |
| //If the source member is equivalent to the target member, go |
| //ahead and allow the bucket move (it's not making our redundancy worse). |
| //TODO we could have some logic to prefer moving to different ip addresses |
| //Probably that logic should be another stage after redundancy recovery, like |
| //improveRedundancy. |
| boolean sourceIsEquivalent = sourceMember != null |
| && addressComparor.areSameZone(getMemberId(), |
| sourceMember.getDistributedMember()); |
| if (sourceMember == null || !sourceIsEquivalent) { |
| for(Member hostingMember : bucket.getMembersHosting()) { |
| if ((!hostingMember.equals(sourceMember) || addressComparor.enforceUniqueZones()) |
| && addressComparor.areSameZone(getMemberId(), |
| hostingMember.getDistributedMember())) { |
| if(logger.isDebugEnabled()) { |
| logger.debug("Member {} would prefer not to host {} because it is already on another member with the same redundancy zone", this, bucket); |
| } |
| return RefusalReason.SAME_ZONE; |
| } |
| } |
| } |
| } |
| |
| //check the localMaxMemory |
| if(this.enforceLocalMaxMemory && this.totalBytes + bucket.getBytes() > this.localMaxMemory) { |
| if(logger.isDebugEnabled()) { |
| logger.debug("Member {} won't host bucket {} because it doesn't have enough space", this, bucket); |
| } |
| return RefusalReason.LOCAL_MAX_MEMORY_FULL; |
| } |
| |
| //check to see if the heap is critical |
| if(isCritical) { |
| if(logger.isDebugEnabled()) { |
| logger.debug("Member {} won't host bucket {} because it's heap is critical",this, bucket); |
| } |
| return RefusalReason.CRITICAL_HEAP; |
| } |
| |
| return RefusalReason.NONE; |
| } |
| |
| public boolean addBucket(Bucket bucket) { |
| if(getBuckets().add(bucket)) { |
| bucket.addMember(this); |
| this.totalBytes += bucket.getBytes(); |
| this.totalLoad += bucket.getLoad(); |
| return true; |
| } |
| return false; |
| } |
| |
| public boolean removeBucket(Bucket bucket) { |
| if(getBuckets().remove(bucket)) { |
| bucket.removeMember(this); |
| this.totalBytes -= bucket.getBytes(); |
| this.totalLoad -= bucket.getLoad(); |
| return true; |
| } |
| return false; |
| } |
| |
| public boolean removePrimary(Bucket bucket) { |
| if(getPrimaryBuckets().remove(bucket)) { |
| this.totalPrimaryLoad -= bucket.getPrimaryLoad(); |
| return true; |
| } |
| return false; |
| } |
| |
| public boolean addPrimary(Bucket bucket) { |
| if(getPrimaryBuckets().add(bucket)) { |
| this.totalPrimaryLoad += bucket.getPrimaryLoad(); |
| return true; |
| } |
| return false; |
| } |
| |
| public int getBucketCount() { |
| return getBuckets().size(); |
| } |
| |
| public long getConfiguredMaxMemory() { |
| return this.localMaxMemory; |
| } |
| |
| public InternalDistributedMember getDistributedMember() { |
| return getMemberId(); |
| } |
| |
| public int getPrimaryCount() { |
| int primaryCount = 0; |
| for(Bucket bucket: getBuckets()) { |
| if(this.equals(bucket.primary)) { |
| primaryCount++; |
| } |
| } |
| return primaryCount; |
| } |
| |
| public long getSize() { |
| return this.totalBytes; |
| } |
| |
| public float getTotalLoad() { |
| return this.totalLoad; |
| } |
| |
| public float getWeight() { |
| return this.weight; |
| } |
| |
| @Override |
| public String toString() { |
| return "Member(id=" + getMemberId()+ ")"; |
| } |
| |
| public float getPrimaryLoad() { |
| return this.totalPrimaryLoad; |
| } |
| |
| protected Set<Bucket> getBuckets() { |
| return this.buckets; |
| } |
| |
| private InternalDistributedMember getMemberId() { |
| return this.memberId; |
| } |
| |
| private Set<Bucket> getPrimaryBuckets() { |
| return this.primaryBuckets; |
| } |
| |
| @Override |
| public int hashCode() { |
| return memberId.hashCode(); |
| } |
| @Override |
| public boolean equals(Object other) { |
| if (!(other instanceof Member)) { |
| return false; |
| } |
| Member o = (Member)other; |
| return this.memberId.equals(o.memberId); |
| } |
| |
| public int compareTo(Member other) { |
| // memberId is InternalDistributedMember which implements Comparable |
| return this.memberId.compareTo(other.memberId); |
| } |
| } |
| |
| /** |
| * Represents a single bucket. |
| */ |
| protected class Bucket implements Comparable<Bucket> { |
| protected long bytes; |
| private final int id; |
| protected float load; |
| protected float primaryLoad; |
| private int redundancy = -1; |
| private final Set<Member> membersHosting = new TreeSet<Member>(); |
| private Member primary; |
| protected Set<PersistentMemberID> offlineMembers = new HashSet<PersistentMemberID>(); |
| |
| public Bucket(int id) { |
| this.id = id; |
| } |
| |
| public Bucket(int id, float load, long bytes, Set<PersistentMemberID> offlineMembers) { |
| this(id); |
| this.load = load; |
| this.bytes = bytes; |
| this.offlineMembers = offlineMembers; |
| } |
| |
| public void setPrimary(Member member, float primaryLoad) { |
| if(this.primary == INVALID_MEMBER) { |
| return; |
| } |
| if(this.primary != null) { |
| this.primary.removePrimary(this); |
| } |
| this.primary = member; |
| this.primaryLoad = primaryLoad; |
| if(primary != INVALID_MEMBER && primary != null) { |
| addMember(primary); |
| member.addPrimary(this); |
| } |
| } |
| |
| /** |
| * @param targetMember |
| */ |
| public boolean addMember(Member targetMember) { |
| if(this.getMembersHosting().add(targetMember)) { |
| this.redundancy++; |
| targetMember.addBucket(this); |
| return true; |
| } |
| |
| return false; |
| } |
| |
| public boolean removeMember(Member targetMember) { |
| if(this.getMembersHosting().remove(targetMember)) { |
| if(targetMember == this.primary) { |
| setPrimary(null, 0); |
| } |
| this.redundancy--; |
| targetMember.removeBucket(this); |
| return true; |
| } |
| return false; |
| } |
| |
| public int getRedundancy() { |
| return this.redundancy + offlineMembers.size(); |
| } |
| |
| public int getOnlineRedundancy() { |
| return this.redundancy; |
| } |
| |
| public float getLoad() { |
| return this.load; |
| } |
| |
| public int getId() { |
| return this.id; |
| } |
| |
| public long getBytes() { |
| return this.bytes; |
| } |
| |
| @Override |
| public String toString() { |
| return "Bucket(id=" + getId() + ",load=" + load +")"; |
| } |
| |
| public float getPrimaryLoad() { |
| return this.primaryLoad; |
| } |
| |
| public Set<Member> getMembersHosting() { |
| return this.membersHosting; |
| } |
| |
| public Member getPrimary() { |
| return this.primary; |
| } |
| |
| public Collection<? extends PersistentMemberID> getOfflineMembers() { |
| return offlineMembers; |
| } |
| |
| @Override |
| public int hashCode() { |
| return this.id; |
| } |
| @Override |
| public boolean equals(Object other) { |
| if (!(other instanceof Bucket)) { |
| return false; |
| } |
| Bucket o = (Bucket)other; |
| return this.id == o.id; |
| } |
| public int compareTo(Bucket other) { |
| if (this.id < other.id) { |
| return -1; |
| } else if (this.id > other.id) { |
| return 1; |
| } else { |
| return 0; |
| } |
| } |
| } |
| |
| /** |
| * Represents a move from one node to another. Used |
| * to keep track of moves that we have already attempted |
| * that have failed. |
| * |
| * @author dsmith |
| * |
| */ |
| protected static class Move { |
| private final Member source; |
| private final Member target; |
| private final Bucket bucket; |
| |
| public Move(Member source, Member target, Bucket bucket) { |
| super(); |
| this.source = source; |
| this.target = target; |
| this.bucket = bucket; |
| } |
| |
| |
| /** |
| * @return the source |
| */ |
| public Member getSource() { |
| return this.source; |
| } |
| |
| |
| /** |
| * @return the target |
| */ |
| public Member getTarget() { |
| return this.target; |
| } |
| |
| |
| /** |
| * @return the bucket |
| */ |
| public Bucket getBucket() { |
| return this.bucket; |
| } |
| |
| |
| @Override |
| public int hashCode() { |
| final int prime = 31; |
| int result = 1; |
| result = prime * result |
| + ((this.bucket == null) ? 0 : this.bucket.hashCode()); |
| result = prime * result |
| + ((this.source == null) ? 0 : this.source.hashCode()); |
| result = prime * result |
| + ((this.target == null) ? 0 : this.target.hashCode()); |
| return result; |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (this == obj) |
| return true; |
| if (obj == null) |
| return false; |
| if (getClass() != obj.getClass()) |
| return false; |
| Move other = (Move) obj; |
| if (this.bucket == null) { |
| if (other.bucket != null) |
| return false; |
| } else if (!this.bucket.equals(other.bucket)) |
| return false; |
| if (this.source == null) { |
| if (other.source != null) |
| return false; |
| } else if (!this.source.equals(other.source)) |
| return false; |
| if (this.target == null) { |
| if (other.target != null) |
| return false; |
| } else if (!this.target.equals(other.target)) |
| return false; |
| return true; |
| } |
| |
| |
| |
| } |
| |
| public static interface AddressComparor { |
| |
| public boolean enforceUniqueZones(); |
| /** |
| * Return true if the two addresses are equivalent |
| */ |
| public boolean areSameZone(InternalDistributedMember member1, InternalDistributedMember member2); |
| } |
| |
| public static enum RefusalReason { |
| NONE, |
| ALREADY_HOSTING, UNITIALIZED_MEMBER, |
| SAME_ZONE, |
| LOCAL_MAX_MEMORY_FULL, |
| CRITICAL_HEAP; |
| |
| public boolean willAccept() { |
| return this == NONE; |
| } |
| |
| public String formatMessage(Member source, Member target, Bucket bucket) { |
| switch(this) { |
| case NONE: |
| return "No reason, the move should be allowed."; |
| case ALREADY_HOSTING: |
| return "Target member " + target.getMemberId() |
| + " is already hosting bucket " + bucket.getId(); |
| case UNITIALIZED_MEMBER: |
| return "Target member " + target.getMemberId() |
| + " is not fully initialized"; |
| case SAME_ZONE: |
| return "Target member " |
| + target.getMemberId() |
| + " is in the same redundancy zone as other members hosting bucket " |
| + bucket.getId() + ": " + bucket.getMembersHosting(); |
| case LOCAL_MAX_MEMORY_FULL: |
| return "Target member " + target.getMemberId() |
| + " does not have space within it's local max memory for bucket " |
| + bucket.getId() + ". Bucket Size " + bucket.getBytes() |
| + " local max memory: " + target.localMaxMemory + " remaining: " |
| + target.totalBytes; |
| case CRITICAL_HEAP: |
| return "Target member " |
| + target.getMemberId() |
| + " has reached its critical heap percentage, and cannot accept more data"; |
| default: |
| return this.toString(); |
| } |
| } |
| } |
| } |