/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.partitioned.rebalance.model;

import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;

import org.apache.logging.log4j.Logger;

import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.partition.PartitionMemberInfo;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.FixedPartitionAttributesImpl;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.partitioned.InternalPartitionDetails;
import org.apache.geode.internal.cache.partitioned.OfflineMemberDetails;
import org.apache.geode.internal.cache.partitioned.PRLoad;
import org.apache.geode.internal.cache.partitioned.PartitionMemberInfoImpl;
import org.apache.geode.internal.cache.partitioned.rebalance.BucketOperator;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.logging.LogService;

/**
 * A model of the load on all of the members for a partitioned region. This model is used to find
 * the best members to create buckets on or move buckets or primaries too. All of the actual work of
 * creating a copy, moving a primary, etc. Is performed by the BucketOperator that is passed to the
 * constructor.
 *
 * To use, create a model and populate it using the addMember method. addMember takes a region
 * argument, to indicate which region the data is for. All of the regions added to a single model
 * are assumed to be colocated, and the model adds together the load from each of the individual
 * regions to balance all of the regions together.
 *
 * Reblancing operations are performed by repeatedly calling model.nextStep until it returns false.
 * Each call to nextStep should perform another operation. The model will make callbacks to the
 * BucketOperator you provide to the contructor perform the actual create or move.
 *
 * While creating redundant copies our moving buckets, this model tries to minimize the standard
 * deviation in the weighted loads for the members. The weighted load for the member is the sum of
 * the load for all of the buckets on the member divided by that members weight.
 *
 * This model is not threadsafe.
 *
 * @since GemFire 6.0
 */
@SuppressWarnings("synthetic-access")
public class PartitionedRegionLoadModel {
  private static final Logger logger = LogService.getLogger();

  /**
   * A comparator that is used to sort buckets in the order that we should satisfy redundancy - most
   * needy buckets first.
   */
  @Immutable
  private static final Comparator<Bucket> REDUNDANCY_COMPARATOR = (o1, o2) -> {
    // put the buckets with the lowest redundancy first
    int result = o1.getRedundancy() - o2.getRedundancy();
    if (result == 0) {
      // put the bucket with the largest load first. This should give us a
      // better chance of finding a place to put it
      result = Float.compare(o2.getLoad(), o1.getLoad());
    }
    if (result == 0) {
      // finally, just use the id so the comparator doesn't swallow buckets
      // with the same load
      result = o1.getId() - o2.getId();
    }

    return result;
  };

  private static final long MEGABYTES = 1024 * 1024;

  /**
   * A member to represent inconsistent data. For example, if two members think they are the primary
   * for a bucket, we will set the primary to invalid, so it won't be a candidate for rebalancing.
   */
  @Immutable
  public static final MemberRollup INVALID_MEMBER = new MemberRollup(null, null, false, false);

  private final BucketRollup[] buckets;

  /**
   * A map of all members that host this partitioned region
   */
  private final Map<InternalDistributedMember, MemberRollup> members = new HashMap<>();

  /**
   * The set of all regions that are colocated in this model.
   */
  private final Set<String> allColocatedRegions = new HashSet<>();

  /**
   * The list of buckets that have low redundancy
   */
  private SortedSet<BucketRollup> lowRedundancyBuckets = null;
  private SortedSet<BucketRollup> overRedundancyBuckets = null;
  private final Collection<Move> attemptedPrimaryMoves = new HashSet<>();
  private final Collection<Move> attemptedBucketMoves = new HashSet<>();
  private final Collection<Move> attemptedBucketCreations = new HashSet<>();
  private final Collection<Move> attemptedBucketRemoves = new HashSet<>();

  private final BucketOperator operator;
  private final int requiredRedundancy;

  /** The average primary load on a member */
  private float primaryAverage = -1;
  /** The average bucket load on a member */
  private float averageLoad = -1;
  /**
   * The minimum improvement in variance that we'll consider worth moving a primary
   */
  private double minPrimaryImprovement = -1;
  /**
   * The minimum improvement in variance that we'll consider worth moving a bucket
   */
  private double minImprovement = -1;

  private final AddressComparor addressComparor;

  private final Set<InternalDistributedMember> criticalMembers;

  private final PartitionedRegion partitionedRegion;

  /**
   * Create a new model
   *
   * @param operator the operator which performs the actual creates/moves for buckets
   * @param redundancyLevel The expected redundancy level for the region
   */
  public PartitionedRegionLoadModel(BucketOperator operator, int redundancyLevel, int numBuckets,
      AddressComparor addressComparor, Set<InternalDistributedMember> criticalMembers,
      PartitionedRegion region) {
    this.operator = operator;
    this.requiredRedundancy = redundancyLevel;
    this.buckets = new BucketRollup[numBuckets];
    this.addressComparor = addressComparor;
    this.criticalMembers = criticalMembers;
    this.partitionedRegion = region;
  }

  /**
   * Add a region to the model. All regions that are added are assumed to be colocated. The first
   * region added to the model should be the parent region. The parent region is expected to have at
   * least as many members as child regions; it may have more. If the parent has more members than
   * child regions those members will be considered invalid.
   *
   * @param memberDetailSet A set of details about each member.
   */
  public void addRegion(String region,
      Collection<? extends InternalPartitionDetails> memberDetailSet,
      OfflineMemberDetails offlineDetails, boolean enforceLocalMaxMemory) {
    this.allColocatedRegions.add(region);
    // build up a list of members and an array of buckets for this
    // region. Each bucket has a reference to all of the members
    // that host it and each member has a reference to all of the buckets
    // it hosts
    Map<InternalDistributedMember, Member> regionMember = new HashMap<>();
    Bucket[] regionBuckets = new Bucket[this.buckets.length];
    for (InternalPartitionDetails memberDetails : memberDetailSet) {
      InternalDistributedMember memberId =
          (InternalDistributedMember) memberDetails.getDistributedMember();

      boolean isCritical = criticalMembers.contains(memberId);
      Member member = new Member(addressComparor, memberId, memberDetails.getPRLoad().getWeight(),
          memberDetails.getConfiguredMaxMemory(), isCritical, enforceLocalMaxMemory);
      regionMember.put(memberId, member);

      PRLoad load = memberDetails.getPRLoad();
      for (int i = 0; i < regionBuckets.length; i++) {
        if (load.getReadLoad(i) > 0) {
          Bucket bucket = regionBuckets[i];
          if (bucket == null) {
            Set<PersistentMemberID> offlineMembers = offlineDetails.getOfflineMembers(i);
            bucket =
                new Bucket(i, load.getReadLoad(i), memberDetails.getBucketSize(i), offlineMembers);
            regionBuckets[i] = bucket;
          }
          bucket.addMember(member);
          if (load.getWriteLoad(i) > 0) {
            if (bucket.getPrimary() == null) {
              bucket.setPrimary(member, load.getWriteLoad(i));
            } else if (!bucket.getPrimary().equals(member)) {
              bucket.setPrimary(INVALID_MEMBER, 1);
            }
          }
        }
      }
    }

    // add each member for this region to a rollup of all colocated
    // regions
    for (Member member : regionMember.values()) {
      InternalDistributedMember memberId = member.getDistributedMember();
      MemberRollup memberSum = this.members.get(memberId);
      boolean isCritical = criticalMembers.contains(memberId);
      if (memberSum == null) {
        memberSum = new MemberRollup(addressComparor, memberId, isCritical, enforceLocalMaxMemory);
        this.members.put(memberId, memberSum);
      }

      memberSum.addColocatedMember(region, member);
    }

    // Now, add the region to the rollups of the colocated
    // regions and buckets
    for (int i = 0; i < this.buckets.length; i++) {
      if (regionBuckets[i] == null) {
        // do nothing, this bucket is not hosted for this region.
        // [sumedh] remove from buckets array too to be consistent since
        // this method will be invoked repeatedly for all colocated regions,
        // and then we may miss some colocated regions for a bucket leading
        // to all kinds of issues later
        this.buckets[i] = null;
        continue;
      }
      if (this.buckets[i] == null) {
        // If this is the first region we have seen that is hosting this bucket, create a bucket
        // rollup
        this.buckets[i] = new BucketRollup(i);
      }

      // Add all of the members hosting the bucket to the rollup
      for (Member member : regionBuckets[i].getMembersHosting()) {
        InternalDistributedMember memberId = member.getDistributedMember();
        this.buckets[i].addMember(this.members.get(memberId));
      }

      // set the primary for the rollup
      if (regionBuckets[i].getPrimary() != null) {
        if (this.buckets[i].getPrimary() == null) {
          InternalDistributedMember memberId = regionBuckets[i].getPrimary().getDistributedMember();
          this.buckets[i].setPrimary(this.members.get(memberId), 0);
        } else {
          if (!(this.buckets[i].getPrimary() == INVALID_MEMBER)) {
            if (!this.buckets[i].getPrimary().getDistributedMember()
                .equals(regionBuckets[i].getPrimary().getDistributedMember())) {
              if (logger.isDebugEnabled()) {
                logger.debug(
                    "PartitionedRegionLoadModel - Setting bucket {} to INVALID because it is the primary on two members.This could just be a race in the collocation of data. member1={} member2={}",
                    this.buckets[i], this.buckets[i].getPrimary(), regionBuckets[i].getPrimary());
              }
              this.buckets[i].setPrimary(INVALID_MEMBER, 0);
            }
          }
        }
      }
      this.buckets[i].addColocatedBucket(region, regionBuckets[i]);
    }

    // TODO rebalance - there is a possibility of adding members
    // back here, which I don't like. I think maybe all of the regions should be in the
    // constructor for the load model, and then when the constructor is done
    // we can do with validation.
    // If any members don't have this new region, remove them.
    for (Iterator<Entry<InternalDistributedMember, MemberRollup>> itr =
        members.entrySet().iterator(); itr.hasNext();) {
      MemberRollup memberRollup = itr.next().getValue();
      if (!memberRollup.getColocatedMembers().keySet().equals(this.allColocatedRegions)) {
        itr.remove();
        if (logger.isDebugEnabled()) {
          logger.debug(
              "PartitionedRegionLoadModel - removing member {} from the consideration because it doesn't have all of the colocated regions. Expected={}, was={}",
              memberRollup, allColocatedRegions, memberRollup.getColocatedMembers());
        }
        // This state should never happen
        if (!memberRollup.getBuckets().isEmpty()) {
          logger.warn(
              "PartitionedRegionLoadModel - member {} has incomplete colocation, but it has buckets for some regions. Should have colocated regions {} but had {} and contains buckets {}",
              new Object[] {memberRollup, this.allColocatedRegions,
                  memberRollup.getColocatedMembers().keySet(), memberRollup.getBuckets()});
        }
        for (Bucket bucket : new HashSet<Bucket>(memberRollup.getBuckets())) {
          bucket.removeMember(memberRollup);
        }
      }
    }
  }

  public void initialize() {
    resetAverages();
    initOverRedundancyBuckets();
    initLowRedundancyBuckets();
  }

  public SortedSet<BucketRollup> getLowRedundancyBuckets() {
    return lowRedundancyBuckets;
  }

  public SortedSet<BucketRollup> getOverRedundancyBuckets() {
    return overRedundancyBuckets;
  }

  public boolean enforceUniqueZones() {
    return addressComparor.enforceUniqueZones();
  }

  public void ignoreLowRedundancyBucket(BucketRollup first) {
    this.lowRedundancyBuckets.remove(first);
  }

  public void ignoreOverRedundancyBucket(BucketRollup first) {
    this.overRedundancyBuckets.remove(first);
  }

  public MemberRollup getMember(InternalDistributedMember target) {
    return members.get(target);
  }

  public BucketRollup[] getBuckets() {
    return buckets;
  }

  public String getName() {
    return getPartitionedRegion().getFullPath();
  }

  public PartitionedRegion getPartitionedRegion() {
    // TODO - this model really should not have
    // a reference to the partitioned region object.
    // The fixed PR code currently depends on this
    // partitioned region object and needs
    // refactoring.
    return partitionedRegion;
  }

  private Map<String, Long> getColocatedRegionSizes(BucketRollup bucket) {
    Map<String, Long> colocatedRegionSizes = new HashMap<>();

    for (Map.Entry<String, Bucket> entry : bucket.getColocatedBuckets().entrySet()) {
      colocatedRegionSizes.put(entry.getKey(), entry.getValue().getBytes());
    }
    return colocatedRegionSizes;
  }

  /**
   * Trigger the creation of a redundant bucket, potentially asynchronously.
   *
   * This method will find the best node to create a redundant bucket and invoke the bucket operator
   * to create a bucket on that node. Because the bucket operator is asynchronous, the bucket may
   * not be created immediately, but the model will be updated regardless. Invoke
   * {@link #waitForOperations()} to wait for those operations to actually complete
   */
  public void createRedundantBucket(final BucketRollup bucket, final Member targetMember) {
    Map<String, Long> colocatedRegionSizes = getColocatedRegionSizes(bucket);
    final Move move = new Move(null, targetMember, bucket);

    this.lowRedundancyBuckets.remove(bucket);
    bucket.addMember(targetMember);
    // put the bucket back into the list if we still need to satisfy redundancy for
    // this bucket
    if (bucket.getRedundancy() < this.requiredRedundancy) {
      this.lowRedundancyBuckets.add(bucket);
    }
    resetAverages();

    this.operator.createRedundantBucket(targetMember.getMemberId(), bucket.getId(),
        colocatedRegionSizes, new BucketOperator.Completion() {
          @Override
          public void onSuccess() {}

          @Override
          public void onFailure() {
            // If the bucket creation failed, we need to undo the changes
            // we made to the model
            attemptedBucketCreations.add(move);
            // remove the bucket from lowRedundancyBuckets before mutating the state
            lowRedundancyBuckets.remove(bucket);
            bucket.removeMember(targetMember);
            if (bucket.getRedundancy() < requiredRedundancy) {
              lowRedundancyBuckets.add(bucket);
            }
            resetAverages();
          }
        });
  }

  public void remoteOverRedundancyBucket(BucketRollup bucket, Member targetMember) {
    Move bestMove = new Move(null, targetMember, bucket);
    Map<String, Long> colocatedRegionSizes = getColocatedRegionSizes(bucket);

    if (!this.operator.removeBucket(targetMember.getMemberId(), bucket.getId(),
        colocatedRegionSizes)) {
      this.attemptedBucketRemoves.add(bestMove);
    } else {
      this.overRedundancyBuckets.remove(bucket);
      bucket.removeMember(targetMember);
      // put the bucket back into the list if we still need to satisfy redundancy for
      // this bucket
      if (bucket.getOnlineRedundancy() > this.requiredRedundancy) {
        this.overRedundancyBuckets.add(bucket);
      }
      resetAverages();
    }
  }

  private void initLowRedundancyBuckets() {
    this.lowRedundancyBuckets = new TreeSet<>(REDUNDANCY_COMPARATOR);
    for (BucketRollup b : this.buckets) {
      if (b != null && b.getRedundancy() >= 0 && b.getRedundancy() < this.requiredRedundancy) {
        this.lowRedundancyBuckets.add(b);
      }
    }
  }

  private void initOverRedundancyBuckets() {
    this.overRedundancyBuckets = new TreeSet<>(REDUNDANCY_COMPARATOR);
    for (BucketRollup b : this.buckets) {
      if (b != null && b.getOnlineRedundancy() > this.requiredRedundancy) {
        this.overRedundancyBuckets.add(b);
      }
    }
  }

  /**
   * Find the best member to put a new bucket on.
   *
   * @param bucket the bucket we want to create
   * @param checkIPAddress true if we should only consider members that do not have the same IP
   *        Address as a member that already hosts the bucket
   */
  public Move findBestTarget(Bucket bucket, boolean checkIPAddress) {
    float leastCost = Float.MAX_VALUE;
    Move bestMove = null;

    for (Member member : this.members.values()) {
      if (member.willAcceptBucket(bucket, null, checkIPAddress).willAccept()) {
        float cost = (member.getTotalLoad() + bucket.getLoad()) / member.getWeight();
        if (cost < leastCost) {
          Move move = new Move(null, member, bucket);
          if (!this.attemptedBucketCreations.contains(move)) {
            leastCost = cost;
            bestMove = move;
          }
        }
      }
    }
    return bestMove;
  }

  /**
   * Find the best member to remove a bucket from
   *
   * @param bucket the bucket we want to create
   */
  public Move findBestRemove(Bucket bucket) {
    float mostLoaded = Float.MIN_VALUE;
    Move bestMove = null;

    for (Member member : bucket.getMembersHosting()) {
      float newLoad = (member.getTotalLoad() - bucket.getLoad()) / member.getWeight();
      if (newLoad > mostLoaded && !member.equals(bucket.getPrimary())) {
        Move move = new Move(null, member, bucket);
        if (!this.attemptedBucketRemoves.contains(move)) {
          mostLoaded = newLoad;
          bestMove = move;
        }
      }
    }
    return bestMove;
  }

  public Move findBestTargetForFPR(Bucket bucket, boolean checkIPAddress) {
    InternalDistributedMember targetMemberID = null;
    Member targetMember = null;
    List<FixedPartitionAttributesImpl> fpas =
        this.partitionedRegion.getFixedPartitionAttributesImpl();

    if (fpas != null) {
      for (FixedPartitionAttributesImpl fpaImpl : fpas) {
        if (fpaImpl.hasBucket(bucket.getId())) {
          targetMemberID =
              this.partitionedRegion.getDistributionManager().getDistributionManagerId();
          if (this.members.containsKey(targetMemberID)) {
            targetMember = this.members.get(targetMemberID);
            if (targetMember.willAcceptBucket(bucket, null, checkIPAddress).willAccept()) {
              // We should have just one move for creating
              // all the buckets for a FPR on this node.
              return new Move(null, targetMember, bucket);
            }
          }
        }
      }
    }

    return null;
  }

  public boolean movePrimary(Move bestMove) {
    Member bestSource = bestMove.getSource();
    Member bestTarget = bestMove.getTarget();
    Bucket bestBucket = bestMove.getBucket();
    boolean successfulMove = this.operator.movePrimary(bestSource.getDistributedMember(),
        bestTarget.getDistributedMember(), bestBucket.getId());

    if (successfulMove) {
      bestBucket.setPrimary(bestTarget, bestBucket.getPrimaryLoad());
    }

    boolean entryAdded = this.attemptedPrimaryMoves.add(bestMove);
    Assert.assertTrue(entryAdded,
        "PartitionedRegionLoadModel.movePrimarys - excluded set is not growing, so we probably would have an infinite loop here");

    return successfulMove;
  }

  public Move findBestPrimaryMove() {
    Move bestMove = null;
    double bestImprovement = 0;
    for (Member source : this.members.values()) {
      for (Bucket bucket : source.getPrimaryBuckets()) {
        for (Member target : bucket.getMembersHosting()) {
          if (source.equals(target)) {
            continue;
          }
          double improvement =
              improvement(source.getPrimaryLoad(), source.getWeight(), target.getPrimaryLoad(),
                  target.getWeight(), bucket.getPrimaryLoad(), getPrimaryAverage());
          if (improvement > bestImprovement && improvement > getMinPrimaryImprovement()) {
            Move move = new Move(source, target, bucket);
            if (!this.attemptedPrimaryMoves.contains(move)) {
              bestImprovement = improvement;
              bestMove = move;
            }
          }
        }
      }
    }
    return bestMove;
  }

  /**
   * Calculate the target weighted number of primaries on each node.
   */
  private float getPrimaryAverage() {
    if (this.primaryAverage == -1) {
      float totalWeight = 0;
      float totalPrimaryCount = 0;
      for (Member member : this.members.values()) {
        totalPrimaryCount += member.getPrimaryLoad();
        totalWeight += member.getWeight();
      }

      this.primaryAverage = totalPrimaryCount / totalWeight;
    }

    return this.primaryAverage;
  }

  /**
   * Calculate the target weighted amount of data on each node.
   */
  private float getAverageLoad() {
    if (this.averageLoad == -1) {
      float totalWeight = 0;
      float totalLoad = 0;
      for (Member member : this.members.values()) {
        totalLoad += member.getTotalLoad();
        totalWeight += member.getWeight();
      }

      this.averageLoad = totalLoad / totalWeight;
    }

    return this.averageLoad;
  }

  /**
   * Calculate the minimum improvement in variance that will we consider worth while. Currently this
   * is calculated as the improvement in variance that would occur by removing the smallest bucket
   * from the member with the largest weight.
   */
  private double getMinPrimaryImprovement() {
    if ((this.minPrimaryImprovement + 1.0) < .0000001) { // i.e. == -1
      float largestWeight = 0;
      float smallestBucket = 0;
      for (Member member : this.members.values()) {
        if (member.getWeight() > largestWeight) {
          largestWeight = member.getWeight();
        }
        for (Bucket bucket : member.getPrimaryBuckets()) {
          if (bucket.getPrimaryLoad() < smallestBucket || smallestBucket == 0) {
            smallestBucket = bucket.getPrimaryLoad();
          }
        }
      }
      double before = variance(getPrimaryAverage() * largestWeight + smallestBucket, largestWeight,
          getPrimaryAverage());
      double after =
          variance(getPrimaryAverage() * largestWeight, largestWeight, getPrimaryAverage());
      this.minPrimaryImprovement = (before - after) / smallestBucket;
    }
    return this.minPrimaryImprovement;
  }

  /**
   * Calculate the minimum improvement in variance that will we consider worth while. Currently this
   * is calculated as the improvement in variance that would occur by removing the smallest bucket
   * from the member with the largest weight.
   */
  private double getMinImprovement() {
    if ((this.minImprovement + 1.0) < .0000001) { // i.e. == -1
      float largestWeight = 0;
      float smallestBucket = 0;
      for (Member member : this.members.values()) {
        if (member.getWeight() > largestWeight) {
          largestWeight = member.getWeight();
        }
        // find the smallest bucket, ignoring empty buckets.
        for (Bucket bucket : member.getBuckets()) {
          if (smallestBucket == 0 || (bucket.getLoad() < smallestBucket && bucket.getBytes() > 0)) {
            smallestBucket = bucket.getLoad();
          }
        }
      }
      double before = variance(getAverageLoad() * largestWeight + smallestBucket, largestWeight,
          getAverageLoad());
      double after = variance(getAverageLoad() * largestWeight, largestWeight, getAverageLoad());
      this.minImprovement = (before - after) / smallestBucket;
    }
    return this.minImprovement;
  }

  private void resetAverages() {
    this.primaryAverage = -1;
    this.averageLoad = -1;
    this.minPrimaryImprovement = -1;
    this.minImprovement = -1;
  }

  /**
   * Calculate how much the variance in load will decrease for a given move.
   *
   * @param sLoad the current load on the source member
   * @param sWeight the weight of the source member
   * @param tLoad the current load on the target member
   * @param tWeight the weight of the target member
   * @param bucketSize the size of the bucket we're considering moving
   * @param average the target weighted load for all members.
   * @return the change in variance that would occur by making this move. Essentially
   *         variance_before - variance_after, so a positive change is a means the variance is
   *         decreasing.
   */
  private double improvement(float sLoad, float sWeight, float tLoad, float tWeight,
      float bucketSize, float average) {

    double vSourceBefore = variance(sLoad, sWeight, average);
    double vSourceAfter = variance(sLoad - bucketSize, sWeight, average);
    double vTargetBefore = variance(tLoad, tWeight, average);
    double vTargetAfter = variance(tLoad + bucketSize, tWeight, average);

    double improvement = vSourceBefore - vSourceAfter + vTargetBefore - vTargetAfter;
    return improvement / bucketSize;
  }

  private double variance(double load, double weight, double average) {
    double deviation = (load / weight - average);
    return deviation * deviation;
  }

  public Move findBestBucketMove() {
    Move bestMove = null;
    double bestImprovement = 0;
    for (Member source : this.members.values()) {
      for (Bucket bucket : source.getBuckets()) {
        for (Member target : this.members.values()) {
          if (bucket.getMembersHosting().contains(target)) {
            continue;
          }
          if (!target.willAcceptBucket(bucket, source, true).willAccept()) {
            continue;
          }
          double improvement = improvement(source.getTotalLoad(), source.getWeight(),
              target.getTotalLoad(), target.getWeight(), bucket.getLoad(), getAverageLoad());
          if (improvement > bestImprovement && improvement > getMinImprovement()) {
            Move move = new Move(source, target, bucket);
            if (!this.attemptedBucketMoves.contains(move)) {
              bestImprovement = improvement;
              bestMove = move;
            }
          }
        }
      }
    }
    return bestMove;
  }

  public boolean moveBucket(Move bestMove) {
    Member bestSource = bestMove.getSource();
    Member bestTarget = bestMove.getTarget();
    BucketRollup bestBucket = (BucketRollup) bestMove.getBucket();

    Map<String, Long> colocatedRegionSizes = getColocatedRegionSizes(bestBucket);

    boolean successfulMove = this.operator.moveBucket(bestSource.getDistributedMember(),
        bestTarget.getDistributedMember(), bestBucket.getId(), colocatedRegionSizes);

    if (successfulMove) {
      bestBucket.addMember(bestTarget);
      if (bestSource.equals(bestBucket.getPrimary())) {
        bestBucket.setPrimary(bestTarget, bestBucket.getPrimaryLoad());
      }
      bestBucket.removeMember(bestSource);
    }

    boolean entryAdded = this.attemptedBucketMoves.add(bestMove);
    Assert.assertTrue(entryAdded,
        "PartitionedRegionLoadModel.moveBuckets - excluded set is not growing, so we probably would have an infinite loop here");

    return successfulMove;
  }

  /**
   * Return a snapshot of what the partitioned member details look like.
   *
   * @return a set of partitioned member details.
   */
  public Set<PartitionMemberInfo> getPartitionedMemberDetails(String region) {
    TreeSet<PartitionMemberInfo> result = new TreeSet<>();
    for (MemberRollup member : this.members.values()) {
      Member colocatedMember = member.getColocatedMember(region);
      if (colocatedMember != null) {
        result.add(new PartitionMemberInfoImpl(colocatedMember.getDistributedMember(),
            colocatedMember.getConfiguredMaxMemory(), colocatedMember.getSize(),
            colocatedMember.getBucketCount(), colocatedMember.getPrimaryCount()));
      }
    }
    return result;
  }

  /**
   * For testing only, calculate the total variance of the members
   */
  public float getVarianceForTest() {
    float variance = 0;

    for (Member member : this.members.values()) {
      variance += variance(member.getTotalLoad(), member.getWeight(), getAverageLoad());
    }

    return variance;
  }

  /**
   * For testing only, calculate the total variance of the members
   */
  public float getPrimaryVarianceForTest() {
    float variance = 0;

    for (Member member : this.members.values()) {
      variance += variance(member.getPrimaryLoad(), member.getWeight(), getPrimaryAverage());
    }

    return variance;
  }

  /**
   * Wait for the bucket operator to complete any pending asynchronous operations.
   */
  public void waitForOperations() {
    operator.waitForOperations();
  }

  @Override
  public String toString() {
    StringBuilder result = new StringBuilder();
    TreeSet<Bucket> allBucketIds = new TreeSet<>(Comparator.comparingInt(Bucket::getId));
    if (this.members.isEmpty()) {
      return "";
    }
    int longestMemberId = 0;
    for (Member member : this.members.values()) {
      allBucketIds.addAll(member.getBuckets());
      int memberIdLength = member.getDistributedMember().toString().length();
      if (longestMemberId < memberIdLength) {
        longestMemberId = memberIdLength;
      }
    }
    result
        .append(String.format("%" + longestMemberId + "s primaries size(MB)  max(MB)", "MemberId"));
    for (Bucket bucket : allBucketIds) {
      result.append(String.format("%4s", bucket.getId()));
    }
    for (Member member : this.members.values()) {
      result.append(String.format("\n%" + longestMemberId + "s %9.0f %8.2f %8.2f",
          member.getDistributedMember(), member.getPrimaryLoad(),
          member.getSize() / (float) MEGABYTES,
          member.getConfiguredMaxMemory() / (float) MEGABYTES));
      for (Bucket bucket : allBucketIds) {
        char symbol;
        if (member.getPrimaryBuckets().contains(bucket)) {
          symbol = 'P';
        } else if (member.getBuckets().contains(bucket)) {
          symbol = 'R';
        } else {
          symbol = 'X';
        }
        result.append("   ").append(symbol);
      }
    }

    result.append(String.format("\n%" + longestMemberId + "s                            ",
        "#offline"));
    for (Bucket bucket : allBucketIds) {
      result.append(String.format("%4s", bucket.getOfflineMembers().size()));
    }

    return result.toString();
  }

}
