merged changes with master
diff --git a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java
deleted file mode 100644
index 9a22daf..0000000
--- a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012  Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-
-package com.metamx.druid.master;
-
-import com.google.common.collect.MinMaxPriorityQueue;
-import com.metamx.common.Pair;
-import com.metamx.common.logger.Logger;
-import com.metamx.druid.client.DataSegment;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.Random;
-
-/**
- * The BalancerCostAnalyzer will compute the total initial cost of the cluster, with costs defined in
- * computeJointSegmentCosts.  It will then propose to move (pseudo-)randomly chosen segments from their
- * respective initial servers to other servers, chosen greedily to minimize the cost of the cluster.
- */
-public class BalancerCostAnalyzer
-{
-  private static final Logger log = new Logger(BalancerCostAnalyzer.class);
-  private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
-  private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS;
-  private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS;
-  private final Random rand;
-  private final DateTime referenceTimestamp;
-
-  public BalancerCostAnalyzer(DateTime referenceTimestamp)
-  {
-    this.referenceTimestamp = referenceTimestamp;
-    rand = new Random(0);
-  }
-
-  /**
-   * Calculates the cost normalization.  This is such that the normalized cost is lower bounded
-   * by 1 (e.g. when each segment gets its own compute node).
-   *
-   * @param serverHolders A list of ServerHolders for a particular tier.
-   *
-   * @return The normalization value (the sum of the diagonal entries in the
-   *         pairwise cost matrix).  This is the cost of a cluster if each
-   *         segment were to get its own compute node.
-   */
-  public double calculateNormalization(final List<ServerHolder> serverHolders)
-  {
-    double cost = 0;
-    for (ServerHolder server : serverHolders) {
-      for (DataSegment segment : server.getServer().getSegments().values()) {
-        cost += computeJointSegmentCosts(segment, segment);
-      }
-    }
-    return cost;
-  }
-
-  /**
-   * Calculates the initial cost of the Druid segment configuration.
-   *
-   * @param serverHolders A list of ServerHolders for a particular tier.
-   *
-   * @return The initial cost of the Druid tier.
-   */
-  public double calculateInitialTotalCost(final List<ServerHolder> serverHolders)
-  {
-    double cost = 0;
-    for (ServerHolder server : serverHolders) {
-      DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
-      for (int i = 0; i < segments.length; ++i) {
-        for (int j = i; j < segments.length; ++j) {
-          cost += computeJointSegmentCosts(segments[i], segments[j]);
-        }
-      }
-    }
-    return cost;
-  }
-
-  /**
-   * This defines the unnormalized cost function between two segments.  There is a base cost given by
-   * the minimum size of the two segments and additional penalties.
-   * recencyPenalty: it is more likely that recent segments will be queried together
-   * dataSourcePenalty: if two segments belong to the same data source, they are more likely to be involved
-   * in the same queries
-   * gapPenalty: it is more likely that segments close together in time will be queried together
-   *
-   * @param segment1 The first DataSegment.
-   * @param segment2 The second DataSegment.
-   *
-   * @return The joint cost of placing the two DataSegments together on one node.
-   */
-  public double computeJointSegmentCosts(final DataSegment segment1, final DataSegment segment2)
-  {
-    final Interval gap = segment1.getInterval().gap(segment2.getInterval());
-
-    final double baseCost = Math.min(segment1.getSize(), segment2.getSize());
-    double recencyPenalty = 1;
-    double dataSourcePenalty = 1;
-    double gapPenalty = 1;
-
-    if (segment1.getDataSource().equals(segment2.getDataSource())) {
-      dataSourcePenalty = 2;
-    }
-
-    double maxDiff = Math.max(
-        referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(),
-        referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis()
-    );
-    if (maxDiff < SEVEN_DAYS_IN_MILLIS) {
-      recencyPenalty = 2 - maxDiff / SEVEN_DAYS_IN_MILLIS;
-    }
-
-    /** gap is null if the two segment intervals overlap or if they're adjacent */
-    if (gap == null) {
-      gapPenalty = 2;
-    } else {
-      long gapMillis = gap.toDurationMillis();
-      if (gapMillis < THIRTY_DAYS_IN_MILLIS) {
-        gapPenalty = 2 - gapMillis / THIRTY_DAYS_IN_MILLIS;
-      }
-    }
-
-    final double cost = baseCost * recencyPenalty * dataSourcePenalty * gapPenalty;
-
-    return cost;
-  }
-
-  /**
-   * The balancing application requires us to pick a proposal segment uniformly at random from the set of
-   * all servers.  We use reservoir sampling to do this.
-   *
-   * @param serverHolders A list of ServerHolders for a particular tier.
-   *
-   * @return A BalancerSegmentHolder sampled uniformly at random.
-   */
-  public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders)
-  {
-    ServerHolder fromServerHolder = null;
-    DataSegment proposalSegment = null;
-    int numSoFar = 0;
-
-    for (ServerHolder server : serverHolders) {
-      for (DataSegment segment : server.getServer().getSegments().values()) {
-        int randNum = rand.nextInt(numSoFar + 1);
-        // w.p. 1 / (numSoFar + 1), swap out the server and segment
-        if (randNum == numSoFar) {
-          fromServerHolder = server;
-          proposalSegment = segment;
-          numSoFar++;
-        }
-      }
-    }
-
-    return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment);
-  }
-
-  /**
-   * For balancing, we want to only make a move if the minimum cost server is not already serving the segment.
-   *
-   * @param proposalSegment A DataSegment that we are proposing to move.
-   * @param serverHolders   An iterable of ServerHolders for a particular tier.
-   *
-   * @return A ServerHolder with the new home for a segment.
-   */
-  public ServerHolder findNewSegmentHomeBalance(
-      final DataSegment proposalSegment,
-      final Iterable<ServerHolder> serverHolders
-  )
-  {
-    MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = computeCosts(proposalSegment, serverHolders);
-    if (costsAndServers.isEmpty()) {
-      return null;
-    }
-
-    ServerHolder toServer = costsAndServers.pollFirst().rhs;
-    if (!toServer.isServingSegment(proposalSegment)) {
-      return toServer;
-    }
-
-    return null;
-  }
-
-  /**
-   * For assignment, we want to move to the lowest cost server that isn't already serving the segment.
-   *
-   * @param proposalSegment A DataSegment that we are proposing to move.
-   * @param serverHolders   An iterable of ServerHolders for a particular tier.
-   *
-   * @return A ServerHolder with the new home for a segment.
-   */
-  public ServerHolder findNewSegmentHomeAssign(
-      final DataSegment proposalSegment,
-      final Iterable<ServerHolder> serverHolders
-  )
-  {
-    MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = computeCosts(proposalSegment, serverHolders);
-    while (!costsAndServers.isEmpty()) {
-      ServerHolder toServer = costsAndServers.pollFirst().rhs;
-      if (!toServer.isServingSegment(proposalSegment)) {
-        return toServer;
-      }
-    }
-
-    return null;
-  }
-
-  private MinMaxPriorityQueue<Pair<Double, ServerHolder>> computeCosts(
-      final DataSegment proposalSegment,
-      final Iterable<ServerHolder> serverHolders
-  )
-  {
-    MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = MinMaxPriorityQueue.orderedBy(
-        new Comparator<Pair<Double, ServerHolder>>()
-        {
-          @Override
-          public int compare(
-              Pair<Double, ServerHolder> o,
-              Pair<Double, ServerHolder> o1
-          )
-          {
-            return Double.compare(o.lhs, o1.lhs);
-          }
-        }
-    ).create();
-
-    final long proposalSegmentSize = proposalSegment.getSize();
-
-    for (ServerHolder server : serverHolders) {
-      /** Don't calculate cost if the server doesn't have enough space or is loading the segment */
-      if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
-        continue;
-      }
-
-      /** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
-      double cost = 0f;
-      /**  the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
-      for (DataSegment segment : server.getServer().getSegments().values()) {
-        if (!proposalSegment.equals(segment)) {
-          cost += computeJointSegmentCosts(proposalSegment, segment);
-        }
-      }
-      /**  plus the costs of segments that will be loaded */
-      for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
-        cost += computeJointSegmentCosts(proposalSegment, segment);
-      }
-
-      costsAndServers.add(Pair.of(cost, server));
-    }
-
-    return costsAndServers;
-  }
-
-}
diff --git a/server/src/main/java/com/metamx/druid/master/BalancerStrategy.java b/server/src/main/java/com/metamx/druid/master/BalancerStrategy.java
new file mode 100644
index 0000000..8494411
--- /dev/null
+++ b/server/src/main/java/com/metamx/druid/master/BalancerStrategy.java
@@ -0,0 +1,35 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012  Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+*/
+
+package com.metamx.druid.master;
+
+import com.metamx.druid.client.DataSegment;
+
+import java.util.List;
+
+public interface BalancerStrategy
+{
+  public ServerHolder findNewSegmentHomeBalancer(final DataSegment proposalSegment, final List<ServerHolder> serverHolders);
+
+  public ServerHolder findNewSegmentHomeReplicator(final DataSegment proposalSegment, final List<ServerHolder> serverHolders);
+
+  public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders);
+
+  public void emitStats(String tier, MasterStats stats, List<ServerHolder> serverHolderList);
+}
diff --git a/server/src/main/java/com/metamx/druid/master/BalancerStrategyFactory.java b/server/src/main/java/com/metamx/druid/master/BalancerStrategyFactory.java
new file mode 100644
index 0000000..982d23c
--- /dev/null
+++ b/server/src/main/java/com/metamx/druid/master/BalancerStrategyFactory.java
@@ -0,0 +1,26 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012  Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+*/
+package com.metamx.druid.master;
+
+import org.joda.time.DateTime;
+
+public interface BalancerStrategyFactory
+{
+  public BalancerStrategy getBalancerStrategy(DateTime referenceTimestamp);
+}
diff --git a/server/src/main/java/com/metamx/druid/master/CostBalancerStrategy.java b/server/src/main/java/com/metamx/druid/master/CostBalancerStrategy.java
new file mode 100644
index 0000000..47f7fb1
--- /dev/null
+++ b/server/src/main/java/com/metamx/druid/master/CostBalancerStrategy.java
@@ -0,0 +1,256 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012  Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+*/
+
+package com.metamx.druid.master;
+
+import com.google.common.collect.MinMaxPriorityQueue;
+import com.metamx.common.Pair;
+import com.metamx.druid.client.DataSegment;
+import com.metamx.emitter.EmittingLogger;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.Comparator;
+import java.util.List;
+
+public class CostBalancerStrategy implements BalancerStrategy
+{
+  private static final EmittingLogger log = new EmittingLogger(CostBalancerStrategy.class);
+  private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
+  private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS;
+  private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS;
+  private final DateTime referenceTimestamp;
+
+  public CostBalancerStrategy(DateTime referenceTimestamp)
+  {
+    this.referenceTimestamp = referenceTimestamp;
+  }
+
+  @Override
+  public ServerHolder findNewSegmentHomeReplicator(
+      DataSegment proposalSegment, List<ServerHolder> serverHolders
+  )
+  {
+    ServerHolder holder= chooseBestServer(proposalSegment, serverHolders, false).rhs;
+    if (holder!=null && !holder.isServingSegment(proposalSegment))
+    {
+      return holder;
+    }
+    return null;
+  }
+
+
+  @Override
+  public ServerHolder findNewSegmentHomeBalancer(
+      DataSegment proposalSegment, List<ServerHolder> serverHolders
+  )
+  {
+    return chooseBestServer(proposalSegment, serverHolders, true).rhs;
+  }
+
+
+
+  /**
+   * For assignment, we want to move to the lowest cost server that isn't already serving the segment.
+   *
+   * @param proposalSegment A DataSegment that we are proposing to move.
+   * @param serverHolders   An iterable of ServerHolders for a particular tier.
+   *
+   * @return A ServerHolder with the new home for a segment.
+   */
+
+  private Pair<Double, ServerHolder> chooseBestServer(
+      final DataSegment proposalSegment,
+      final Iterable<ServerHolder> serverHolders,
+      boolean includeCurrentServer
+  )
+  {
+
+    Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
+    MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = MinMaxPriorityQueue.orderedBy(
+        new Comparator<Pair<Double, ServerHolder>>()
+        {
+          @Override
+          public int compare(
+              Pair<Double, ServerHolder> o,
+              Pair<Double, ServerHolder> o1
+          )
+          {
+            return Double.compare(o.lhs, o1.lhs);
+          }
+        }
+    ).create();
+
+    final long proposalSegmentSize = proposalSegment.getSize();
+
+    for (ServerHolder server : serverHolders) {
+      if (includeCurrentServer || !server.isServingSegment(proposalSegment))
+      {
+        /** Don't calculate cost if the server doesn't have enough space or is loading the segment */
+        if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
+          continue;
+        }
+
+        /** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
+        double cost = 0f;
+        /**  the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
+        for (DataSegment segment : server.getServer().getSegments().values()) {
+          if (!proposalSegment.equals(segment)) {
+            cost += computeJointSegmentCosts(proposalSegment, segment);
+          }
+        }
+        /**  plus the costs of segments that will be loaded */
+        for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
+          cost += computeJointSegmentCosts(proposalSegment, segment);
+        }
+
+        if (cost < bestServer.lhs) {
+          bestServer = Pair.of(cost, server);
+        }
+      }
+    }
+
+    return bestServer;
+  }
+
+  /**
+   * This defines the unnormalized cost function between two segments.  There is a base cost given by
+   * the minimum size of the two segments and additional penalties.
+   * recencyPenalty: it is more likely that recent segments will be queried together
+   * dataSourcePenalty: if two segments belong to the same data source, they are more likely to be involved
+   * in the same queries
+   * gapPenalty: it is more likely that segments close together in time will be queried together
+   *
+   * @param segment1 The first DataSegment.
+   * @param segment2 The second DataSegment.
+   *
+   * @return The joint cost of placing the two DataSegments together on one node.
+   */
+  public double computeJointSegmentCosts(final DataSegment segment1, final DataSegment segment2)
+  {
+    final Interval gap = segment1.getInterval().gap(segment2.getInterval());
+
+    final double baseCost = Math.min(segment1.getSize(), segment2.getSize());
+    double recencyPenalty = 1;
+    double dataSourcePenalty = 1;
+    double gapPenalty = 1;
+
+    if (segment1.getDataSource().equals(segment2.getDataSource())) {
+      dataSourcePenalty = 2;
+    }
+
+    double maxDiff = Math.max(
+        referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(),
+        referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis()
+    );
+    double segment1diff=referenceTimestamp.getMillis()-segment1.getInterval().getEndMillis();
+    double segment2diff=referenceTimestamp.getMillis()-segment2.getInterval().getEndMillis();
+    if (segment1diff < SEVEN_DAYS_IN_MILLIS && segment2diff <SEVEN_DAYS_IN_MILLIS) {
+      recencyPenalty = (2 - segment1diff / SEVEN_DAYS_IN_MILLIS)*(2-segment2diff /SEVEN_DAYS_IN_MILLIS);
+    }
+
+    /** gap is null if the two segment intervals overlap or if they're adjacent */
+    if (gap == null) {
+      gapPenalty = 2;
+    } else {
+      long gapMillis = gap.toDurationMillis();
+      if (gapMillis < THIRTY_DAYS_IN_MILLIS) {
+        gapPenalty = 2 - gapMillis / THIRTY_DAYS_IN_MILLIS;
+      }
+    }
+
+    final double cost = baseCost * recencyPenalty * dataSourcePenalty * gapPenalty;
+
+    return cost;
+  }
+
+  public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders)
+  {
+    ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
+    return sampler.getRandomBalancerSegmentHolder(serverHolders);
+  }
+
+  /**
+   * Calculates the initial cost of the Druid segment configuration.
+   *
+   * @param serverHolders A list of ServerHolders for a particular tier.
+   *
+   * @return The initial cost of the Druid tier.
+   */
+  public double calculateInitialTotalCost(final List<ServerHolder> serverHolders)
+  {
+    double cost = 0;
+    for (ServerHolder server : serverHolders) {
+      DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
+      for (int i = 0; i < segments.length; ++i) {
+        for (int j = i; j < segments.length; ++j) {
+          cost += computeJointSegmentCosts(segments[i], segments[j]);
+        }
+      }
+    }
+    return cost;
+  }
+
+  /**
+   * Calculates the cost normalization.  This is such that the normalized cost is lower bounded
+   * by 1 (e.g. when each segment gets its own compute node).
+   *
+   * @param serverHolders A list of ServerHolders for a particular tier.
+   *
+   * @return The normalization value (the sum of the diagonal entries in the
+   *         pairwise cost matrix).  This is the cost of a cluster if each
+   *         segment were to get its own compute node.
+   */
+  public double calculateNormalization(final List<ServerHolder> serverHolders)
+  {
+    double cost = 0;
+    for (ServerHolder server : serverHolders) {
+      for (DataSegment segment : server.getServer().getSegments().values()) {
+        cost += computeJointSegmentCosts(segment, segment);
+      }
+    }
+    return cost;
+  }
+
+  @Override
+  public void emitStats(
+      String tier,
+      MasterStats stats, List<ServerHolder> serverHolderList
+  )
+  {
+    final double initialTotalCost = calculateInitialTotalCost(serverHolderList);
+    final double normalization = calculateNormalization(serverHolderList);
+    final double normalizedInitialCost = initialTotalCost / normalization;
+
+    stats.addToTieredStat("initialCost", tier, (long) initialTotalCost);
+    stats.addToTieredStat("normalization", tier, (long) normalization);
+    stats.addToTieredStat("normalizedInitialCostTimesOneThousand", tier, (long) (normalizedInitialCost * 1000));
+
+    log.info(
+        "[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f]",
+        tier,
+        initialTotalCost,
+        normalization,
+        normalizedInitialCost
+    );
+
+  }
+
+
+}
\ No newline at end of file
diff --git a/server/src/main/java/com/metamx/druid/master/CostBalancerStrategyFactory.java b/server/src/main/java/com/metamx/druid/master/CostBalancerStrategyFactory.java
new file mode 100644
index 0000000..5acb634
--- /dev/null
+++ b/server/src/main/java/com/metamx/druid/master/CostBalancerStrategyFactory.java
@@ -0,0 +1,31 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012  Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+*/
+package com.metamx.druid.master;
+
+import org.joda.time.DateTime;
+
+public class CostBalancerStrategyFactory implements BalancerStrategyFactory
+{
+
+  @Override
+  public BalancerStrategy getBalancerStrategy(DateTime referenceTimestamp)
+  {
+    return new CostBalancerStrategy(referenceTimestamp);
+  }
+}
diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java
index 6a019a6..1303dbd 100644
--- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java
+++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java
@@ -92,7 +92,6 @@
   private final IndexingServiceClient indexingServiceClient;
   private final ScheduledExecutorService exec;
   private final LoadQueueTaskMaster taskMaster;
-
   private final Map<String, LoadQueuePeon> loadManagementPeons;
   private final AtomicReference<LeaderLatch> leaderLatch;
   private volatile AtomicReference<MasterSegmentSettings> segmentSettingsAtomicReference;
diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java
index d1a96a0..889383a 100644
--- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java
+++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java
@@ -78,7 +78,7 @@
   {
     final MasterStats stats = new MasterStats();
     final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
-    final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
+    final BalancerStrategy strategy = params.getBalancerStrategyFactory().getBalancerStrategy(referenceTimestamp);
     final int maxSegmentsToMove = params.getMasterSegmentSettings().getMaxSegmentsToMove();
 
     for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry :
@@ -113,34 +113,25 @@
       }
 
       for (int iter = 0; iter < maxSegmentsToMove; iter++) {
-        final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList);
+        final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList);
 
-        if (params.getAvailableSegments().contains(segmentToMove.getSegment())) {
-          final ServerHolder holder = analyzer.findNewSegmentHomeBalance(segmentToMove.getSegment(), serverHolderList);
+        if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
+          final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList);
 
           if (holder != null) {
             moveSegment(segmentToMove, holder.getServer(), params);
           }
         }
       }
-
-      final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList);
-      final double normalization = analyzer.calculateNormalization(serverHolderList);
-      final double normalizedInitialCost = initialTotalCost / normalization;
-
-      stats.addToTieredStat("initialCost", tier, (long) initialTotalCost);
-      stats.addToTieredStat("normalization", tier, (long) normalization);
-      stats.addToTieredStat("normalizedInitialCostTimesOneThousand", tier, (long) (normalizedInitialCost * 1000));
       stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
+      if (params.getMasterSegmentSettings().isEmitBalancingStats()) {
+        strategy.emitStats(tier, stats, serverHolderList);
 
+      }
       log.info(
-          "[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f], Segments Moved: [%d]",
-          tier,
-          initialTotalCost,
-          normalization,
-          normalizedInitialCost,
-          currentlyMovingSegments.get(tier).size()
+          "[%s]: Segments Moved: [%d]", tier, currentlyMovingSegments.get(tier).size()
       );
+
     }
 
     return params.buildFromExisting()
diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java
index d514b4d..95b5c8d 100644
--- a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java
+++ b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java
@@ -42,12 +42,6 @@
   @Default("PT1800s")
   public abstract Duration getMasterSegmentMergerPeriod();
 
-  @Config("druid.master.millisToWaitBeforeDeleting")
-  public long getMillisToWaitBeforeDeleting()
-  {
-    return 15 * 60 * 1000L;
-  }
-
   @Config("druid.master.merger.on")
   public boolean isMergeSegments()
   {
@@ -66,26 +60,16 @@
     return null;
   }
 
-  @Config("druid.master.merge.threshold")
-  public long getMergeBytesLimit()
-  {
-    return 100000000L;
-  }
-
-  @Config("druid.master.merge.maxSegments")
-  public int getMergeSegmentsLimit()
-  {
-    return Integer.MAX_VALUE;
-  }
-
-  @Config("druid.master.balancer.maxSegmentsToMove")
-  @Default("5")
-  public abstract int getMaxSegmentsToMove();
-
   @Config("druid.master.replicant.lifetime")
   @Default("15")
   public abstract int getReplicantLifetime();
 
+  @Config("druid.master.masterSegmentSettings")
+  public MasterSegmentSettings getMasterSegmentSettings()
+  {
+    return new MasterSegmentSettings.Builder().build();
+  }
+
   @Config("druid.master.replicant.throttleLimit")
   @Default("10")
   public abstract int getReplicantThrottleLimit();
diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java b/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java
index 2bd4870..04a3ce5 100644
--- a/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java
+++ b/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java
@@ -68,7 +68,6 @@
     DatabaseRuleManager databaseRuleManager = paramsWithReplicationManager.getDatabaseRuleManager();
     for (DataSegment segment : paramsWithReplicationManager.getAvailableSegments()) {
       List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
-
       boolean foundMatchingRule = false;
       for (Rule rule : rules) {
         if (rule.appliesTo(segment, now)) {
diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java b/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java
index 3c76b58..4ad153c 100644
--- a/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java
+++ b/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java
@@ -49,6 +49,7 @@
   private final MasterSegmentSettings masterSegmentSettings;
   private final MasterStats stats;
   private final DateTime balancerReferenceTimestamp;
+  private final BalancerStrategyFactory strategyFactory;
 
   public DruidMasterRuntimeParams(
       long startTime,
@@ -62,7 +63,8 @@
       ServiceEmitter emitter,
       MasterSegmentSettings masterSegmentSettings,
       MasterStats stats,
-      DateTime balancerReferenceTimestamp
+      DateTime balancerReferenceTimestamp,
+      BalancerStrategyFactory strategyFactory
   )
   {
     this.startTime = startTime;
@@ -77,6 +79,7 @@
     this.masterSegmentSettings = masterSegmentSettings;
     this.stats = stats;
     this.balancerReferenceTimestamp = balancerReferenceTimestamp;
+    this.strategyFactory = strategyFactory;
   }
 
   public long getStartTime()
@@ -139,9 +142,9 @@
     return balancerReferenceTimestamp;
   }
 
-  public BalancerCostAnalyzer getBalancerCostAnalyzer(DateTime referenceTimestamp)
+  public BalancerStrategyFactory getBalancerStrategyFactory()
   {
-    return new BalancerCostAnalyzer(referenceTimestamp);
+    return strategyFactory;
   }
 
   public boolean hasDeletionWaitTimeElapsed()
@@ -168,7 +171,8 @@
         emitter,
         masterSegmentSettings,
         stats,
-        balancerReferenceTimestamp
+        balancerReferenceTimestamp,
+        strategyFactory
     );
   }
 
@@ -186,6 +190,8 @@
     private MasterSegmentSettings masterSegmentSettings;
     private MasterStats stats;
     private DateTime balancerReferenceTimestamp;
+    private boolean emitBalancingCostParams;
+    private BalancerStrategyFactory strategyFactory;
 
     Builder()
     {
@@ -201,6 +207,8 @@
       this.stats = new MasterStats();
       this.masterSegmentSettings = new MasterSegmentSettings.Builder().build();
       this.balancerReferenceTimestamp = null;
+      this.emitBalancingCostParams = false;
+      this.strategyFactory = new CostBalancerStrategyFactory();
     }
 
     Builder(
@@ -215,7 +223,8 @@
         ServiceEmitter emitter,
         MasterSegmentSettings masterSegmentSettings,
         MasterStats stats,
-        DateTime balancerReferenceTimestamp
+        DateTime balancerReferenceTimestamp,
+        BalancerStrategyFactory strategyFactory
     )
     {
       this.startTime = startTime;
@@ -230,6 +239,8 @@
       this.masterSegmentSettings = masterSegmentSettings;
       this.stats = stats;
       this.balancerReferenceTimestamp = balancerReferenceTimestamp;
+      this.emitBalancingCostParams = emitBalancingCostParams;
+      this.strategyFactory=strategyFactory;
     }
 
     public DruidMasterRuntimeParams build()
@@ -246,10 +257,23 @@
           emitter,
           masterSegmentSettings,
           stats,
-          balancerReferenceTimestamp
+          balancerReferenceTimestamp,
+          strategyFactory
       );
     }
 
+    public Builder withBalancerStrategy(BalancerStrategyFactory strategyFactory)
+    {
+      this.strategyFactory = strategyFactory;
+      return this;
+    }
+
+    public Builder withEmitBalancingCostParams(boolean param)
+    {
+      emitBalancingCostParams = param;
+      return this;
+    }
+
     public Builder withStartTime(long time)
     {
       startTime = time;
@@ -321,5 +345,11 @@
       this.balancerReferenceTimestamp = balancerReferenceTimestamp;
       return this;
     }
+
+    public Builder withBalancerStrategyFactory(BalancerStrategyFactory strategyFactory)
+    {
+      this.strategyFactory=strategyFactory;
+      return this;
+    }
   }
 }
diff --git a/server/src/main/java/com/metamx/druid/master/MasterSegmentSettings.java b/server/src/main/java/com/metamx/druid/master/MasterSegmentSettings.java
index cd6502b..75a56fa 100644
--- a/server/src/main/java/com/metamx/druid/master/MasterSegmentSettings.java
+++ b/server/src/main/java/com/metamx/druid/master/MasterSegmentSettings.java
@@ -28,19 +28,22 @@
   private long mergeBytesLimit= 100000000L;
   private int mergeSegmentsLimit = Integer.MAX_VALUE;
   private int maxSegmentsToMove = 5;
+  private boolean emitBalancingStats = false;
 
   @JsonCreator
   public MasterSegmentSettings(
       @JsonProperty("millisToWaitBeforeDeleting") Long millisToWaitBeforeDeleting,
       @JsonProperty("mergeBytesLimit") Long mergeBytesLimit,
       @JsonProperty("mergeSegmentsLimit") Integer mergeSegmentsLimit,
-      @JsonProperty("maxSegmentsToMove") Integer maxSegmentsToMove
+      @JsonProperty("maxSegmentsToMove") Integer maxSegmentsToMove,
+      @JsonProperty("emitBalancingStats") Boolean emitBalancingStats
   )
   {
       this.maxSegmentsToMove=maxSegmentsToMove;
       this.millisToWaitBeforeDeleting=millisToWaitBeforeDeleting;
       this.mergeSegmentsLimit=mergeSegmentsLimit;
       this.mergeBytesLimit=mergeBytesLimit;
+      this.emitBalancingStats = emitBalancingStats;
   }
 
   public static String getConfigKey()
@@ -60,6 +63,11 @@
     return mergeBytesLimit;
   }
 
+  public boolean isEmitBalancingStats()
+  {
+    return emitBalancingStats;
+  }
+
   @JsonProperty
   public int getMergeSegmentsLimit()
   {
@@ -80,6 +88,7 @@
     private long mergeBytesLimit;
     private int mergeSegmentsLimit;
     private int maxSegmentsToMove;
+    private boolean emitBalancingStats;
 
     public Builder()
     {
@@ -87,14 +96,16 @@
       this.mergeBytesLimit= 100000000L;
       this.mergeSegmentsLimit= Integer.MAX_VALUE;
       this.maxSegmentsToMove = 5;
+      this.emitBalancingStats = false;
     }
 
-    public Builder(long millisToWaitBeforeDeleting, long mergeBytesLimit, int mergeSegmentsLimit, int maxSegmentsToMove)
+    public Builder(long millisToWaitBeforeDeleting, long mergeBytesLimit, int mergeSegmentsLimit, int maxSegmentsToMove, boolean emitBalancingStats)
     {
       this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
       this.mergeBytesLimit = mergeBytesLimit;
       this.mergeSegmentsLimit = mergeSegmentsLimit;
       this.maxSegmentsToMove = maxSegmentsToMove;
+      this.emitBalancingStats = emitBalancingStats;
     }
 
     public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting)
@@ -123,7 +134,7 @@
 
     public MasterSegmentSettings build()
     {
-      return new MasterSegmentSettings(millisToWaitBeforeDeleting,mergeBytesLimit,mergeSegmentsLimit,maxSegmentsToMove);
+      return new MasterSegmentSettings(millisToWaitBeforeDeleting,mergeBytesLimit,mergeSegmentsLimit,maxSegmentsToMove, emitBalancingStats);
     }
   }
 }
diff --git a/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategy.java b/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategy.java
new file mode 100644
index 0000000..d953b69
--- /dev/null
+++ b/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategy.java
@@ -0,0 +1,71 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012  Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+*/
+
+package com.metamx.druid.master;
+
+import com.metamx.druid.client.DataSegment;
+
+import java.util.List;
+import java.util.Random;
+
+public class RandomBalancerStrategy implements BalancerStrategy
+{
+  private final ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
+
+  @Override
+  public ServerHolder findNewSegmentHomeReplicator(
+      DataSegment proposalSegment, List<ServerHolder> serverHolders
+  )
+  {
+    if (serverHolders.size()==1)
+    {
+      return null;
+    }
+    else
+    {
+      ServerHolder holder = serverHolders.get(new Random().nextInt(serverHolders.size()));
+      while (holder.isServingSegment(proposalSegment))
+      {
+        holder = serverHolders.get(new Random().nextInt(serverHolders.size()));
+      }
+      return holder;
+    }
+  }
+
+  @Override
+  public ServerHolder findNewSegmentHomeBalancer(
+      DataSegment proposalSegment, List<ServerHolder> serverHolders
+  )
+  {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders)
+  {
+    return sampler.getRandomBalancerSegmentHolder(serverHolders);
+  }
+
+  @Override
+  public void emitStats(
+      String tier, MasterStats stats, List<ServerHolder> serverHolderList
+  )
+  {
+  }
+}
diff --git a/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategyFactory.java b/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategyFactory.java
new file mode 100644
index 0000000..0cce3a1
--- /dev/null
+++ b/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategyFactory.java
@@ -0,0 +1,30 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012  Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+*/
+package com.metamx.druid.master;
+
+import org.joda.time.DateTime;
+
+public class RandomBalancerStrategyFactory implements BalancerStrategyFactory
+{
+  @Override
+  public BalancerStrategy getBalancerStrategy(DateTime referenceTimestamp)
+  {
+    return new RandomBalancerStrategy();
+  }
+}
diff --git a/server/src/main/java/com/metamx/druid/master/ReservoirSegmentSampler.java b/server/src/main/java/com/metamx/druid/master/ReservoirSegmentSampler.java
new file mode 100644
index 0000000..4db994b
--- /dev/null
+++ b/server/src/main/java/com/metamx/druid/master/ReservoirSegmentSampler.java
@@ -0,0 +1,54 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012  Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+*/
+
+package com.metamx.druid.master;
+
+import com.metamx.druid.client.DataSegment;
+
+import java.util.List;
+import java.util.Random;
+
+public class ReservoirSegmentSampler
+{
+
+  public BalancerSegmentHolder getRandomBalancerSegmentHolder(final List<ServerHolder> serverHolders)
+  {
+    final Random rand = new Random();
+    ServerHolder fromServerHolder = null;
+    DataSegment proposalSegment = null;
+    int numSoFar = 0;
+
+    for (ServerHolder server : serverHolders) {
+      for (DataSegment segment : server.getServer().getSegments().values()) {
+        int randNum = rand.nextInt(numSoFar + 1);
+        // w.p. 1 / (numSoFar+1), swap out the server and segment
+        if (randNum == numSoFar) {
+          fromServerHolder = server;
+          proposalSegment = segment;
+        }
+        numSoFar++;
+      }
+    }
+    if (fromServerHolder != null) {
+      return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment);
+    } else {
+      return null;
+    }
+  }
+}
diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
index 8a9a014..64aa3df 100644
--- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
+++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
@@ -22,7 +22,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.MinMaxPriorityQueue;
 import com.metamx.druid.client.DataSegment;
-import com.metamx.druid.master.BalancerCostAnalyzer;
+import com.metamx.druid.master.BalancerStrategy;
 import com.metamx.druid.master.DruidMaster;
 import com.metamx.druid.master.DruidMasterRuntimeParams;
 import com.metamx.druid.master.LoadPeonCallback;
@@ -60,15 +60,14 @@
 
     final List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(serverQueue);
     final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
-    final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
-
+    final BalancerStrategy strategy = params.getBalancerStrategyFactory().getBalancerStrategy(referenceTimestamp);
     if (params.getAvailableSegments().contains(segment)) {
       stats.accumulate(
           assign(
               params.getReplicationManager(),
               expectedReplicants,
               totalReplicants,
-              analyzer,
+              strategy,
               serverHolderList,
               segment
           )
@@ -84,7 +83,7 @@
       final ReplicationThrottler replicationManager,
       final int expectedReplicants,
       int totalReplicants,
-      final BalancerCostAnalyzer analyzer,
+      final BalancerStrategy strategy,
       final List<ServerHolder> serverHolderList,
       final DataSegment segment
   )
@@ -98,7 +97,7 @@
         break;
       }
 
-      final ServerHolder holder = analyzer.findNewSegmentHomeAssign(segment, serverHolderList);
+      final ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
 
       if (holder == null) {
         log.warn(
diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java
index 4f59483..8513c15 100644
--- a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java
+++ b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java
@@ -132,6 +132,78 @@
     EasyMock.verify(druidServer4);
   }
 
+
+  @Test
+  public void testMoveToEmptyServerBalancer()
+  {
+    EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
+    EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
+    EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
+    EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
+    EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+    EasyMock.replay(druidServer1);
+
+    EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce();
+    EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
+    EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
+    EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
+    EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
+    EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+    EasyMock.replay(druidServer2);
+
+    EasyMock.replay(druidServer3);
+    EasyMock.replay(druidServer4);
+
+    // Mock stuff that the master needs
+    master.moveSegment(
+        EasyMock.<String>anyObject(),
+        EasyMock.<String>anyObject(),
+        EasyMock.<String>anyObject(),
+        EasyMock.<LoadPeonCallback>anyObject()
+    );
+    EasyMock.expectLastCall().anyTimes();
+    EasyMock.replay(master);
+
+    LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
+    LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
+
+    DruidMasterRuntimeParams params =
+        DruidMasterRuntimeParams.newBuilder()
+                                .withDruidCluster(
+                                    new DruidCluster(
+                                        ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
+                                            "normal",
+                                            MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
+                                                               .create(
+                                                                   Arrays.asList(
+                                                                       new ServerHolder(druidServer1, fromPeon),
+                                                                       new ServerHolder(druidServer2, toPeon)
+                                                                   )
+                                                               )
+                                        )
+                                    )
+                                )
+                                .withLoadManagementPeons(
+                                    ImmutableMap.<String, LoadQueuePeon>of(
+                                        "from",
+                                        fromPeon,
+                                        "to",
+                                        toPeon
+                                    )
+                                )
+                                .withAvailableSegments(segments.values())
+                                .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
+                                .withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
+                                .build();
+
+    params = new DruidMasterBalancerTester(master).run(params);
+    Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
+    Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() < segments.size());
+  }
+
+
+
+
   @Test
   public void testRun1()
   {
@@ -203,7 +275,8 @@
     Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
   }
 
- @Test
+
+  @Test
   public void testRun2()
   {
     // Mock some servers of different usages
@@ -295,4 +368,5 @@
     params = new DruidMasterBalancerTester(master).run(params);
     Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
   }
+
 }
diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java
index 213e350..59b727d 100644
--- a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java
+++ b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java
@@ -36,7 +36,6 @@
 import org.junit.Test;
 
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
 
 /**
  */
@@ -96,21 +95,15 @@
           }
 
           @Override
-          public long getMillisToWaitBeforeDeleting()
-          {
-            return super.getMillisToWaitBeforeDeleting();
-          }
-
-          @Override
           public String getMergerServiceName()
           {
             return "";
           }
 
           @Override
-          public int getMaxSegmentsToMove()
+          public MasterSegmentSettings getMasterSegmentSettings()
           {
-            return 0;
+            return new MasterSegmentSettings.Builder().withMillisToWaitBeforeDeleting(super.getMasterSegmentSettings().getMillisToWaitBeforeDeleting()).withMaxSegmentsToMove(0).build();
           }
 
           @Override
diff --git a/server/src/test/java/com/metamx/druid/master/ReservoirSegmentSamplerTest.java b/server/src/test/java/com/metamx/druid/master/ReservoirSegmentSamplerTest.java
new file mode 100644
index 0000000..7b768e1
--- /dev/null
+++ b/server/src/test/java/com/metamx/druid/master/ReservoirSegmentSamplerTest.java
@@ -0,0 +1,208 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012  Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+*/
+package com.metamx.druid.master;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.metamx.druid.client.DataSegment;
+import com.metamx.druid.client.DruidServer;
+import com.metamx.druid.shard.NoneShardSpec;
+import junit.framework.Assert;
+import org.easymock.EasyMock;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+public class ReservoirSegmentSamplerTest
+{
+  private DruidServer druidServer1;
+  private DruidServer druidServer2;
+  private DruidServer druidServer3;
+  private DruidServer druidServer4;
+
+  private ServerHolder holder1;
+  private ServerHolder holder2;
+  private ServerHolder holder3;
+  private ServerHolder holder4;
+
+  private DataSegment segment1;
+  private DataSegment segment2;
+  private DataSegment segment3;
+  private DataSegment segment4;
+  Map<String, DataSegment> segmentsMap1;
+  Map<String, DataSegment> segmentsMap2;
+  Map<String, DataSegment> segmentsMap3;
+  Map<String, DataSegment> segmentsMap4;
+  List<DataSegment> segments;
+
+  @Before
+  public void setUp() throws Exception
+  {
+    druidServer1 = EasyMock.createMock(DruidServer.class);
+    druidServer1 = EasyMock.createMock(DruidServer.class);
+    druidServer2 = EasyMock.createMock(DruidServer.class);
+    druidServer3 = EasyMock.createMock(DruidServer.class);
+    druidServer4 = EasyMock.createMock(DruidServer.class);
+    holder1 = EasyMock.createMock(ServerHolder.class);
+    holder2 = EasyMock.createMock(ServerHolder.class);
+    holder3 = EasyMock.createMock(ServerHolder.class);
+    holder4 = EasyMock.createMock(ServerHolder.class);
+    segment1 = EasyMock.createMock(DataSegment.class);
+    segment2 = EasyMock.createMock(DataSegment.class);
+    segment3 = EasyMock.createMock(DataSegment.class);
+    segment4 = EasyMock.createMock(DataSegment.class);
+
+    DateTime start1 = new DateTime("2012-01-01");
+    DateTime start2 = new DateTime("2012-02-01");
+    DateTime version = new DateTime("2012-03-01");
+    segment1 = new DataSegment(
+        "datasource1",
+        new Interval(start1, start1.plusHours(1)),
+        version.toString(),
+        Maps.<String, Object>newHashMap(),
+        Lists.<String>newArrayList(),
+        Lists.<String>newArrayList(),
+        new NoneShardSpec(),
+        0,
+        11L
+    );
+    segment2 = new DataSegment(
+        "datasource1",
+        new Interval(start2, start2.plusHours(1)),
+        version.toString(),
+        Maps.<String, Object>newHashMap(),
+        Lists.<String>newArrayList(),
+        Lists.<String>newArrayList(),
+        new NoneShardSpec(),
+        0,
+        7L
+    );
+    segment3 = new DataSegment(
+        "datasource2",
+        new Interval(start1, start1.plusHours(1)),
+        version.toString(),
+        Maps.<String, Object>newHashMap(),
+        Lists.<String>newArrayList(),
+        Lists.<String>newArrayList(),
+        new NoneShardSpec(),
+        0,
+        4L
+    );
+    segment4 = new DataSegment(
+        "datasource2",
+        new Interval(start2, start2.plusHours(1)),
+        version.toString(),
+        Maps.<String, Object>newHashMap(),
+        Lists.<String>newArrayList(),
+        Lists.<String>newArrayList(),
+        new NoneShardSpec(),
+        0,
+        8L
+    );
+
+    segments = Lists.newArrayList(segment1, segment2, segment3, segment4);
+
+    segmentsMap1 = ImmutableMap.<String, DataSegment>of(
+        "datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z",
+        segment1
+    );
+    segmentsMap2 = ImmutableMap.<String, DataSegment>of(
+        "datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z",
+        segment2
+    );
+    segmentsMap3 = ImmutableMap.<String, DataSegment>of(
+        "datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z",
+        segment3
+    );
+    segmentsMap4 = ImmutableMap.<String, DataSegment>of(
+        "datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z",
+        segment4
+    );
+  }
+
+  //checks if every segment is selected at least once out of 5000 trials
+  @Test
+  public void getRandomBalancerSegmentHolderTest()
+  {
+    EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce();
+    EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
+    EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
+    EasyMock.expect(druidServer1.getSegments()).andReturn(segmentsMap1).anyTimes();
+    EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+    EasyMock.replay(druidServer1);
+
+    EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce();
+    EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
+    EasyMock.expect(druidServer2.getCurrSize()).andReturn(30L).atLeastOnce();
+    EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
+    EasyMock.expect(druidServer2.getSegments()).andReturn(segmentsMap2).anyTimes();
+    EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+    EasyMock.replay(druidServer2);
+
+    EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce();
+    EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes();
+    EasyMock.expect(druidServer3.getCurrSize()).andReturn(30L).atLeastOnce();
+    EasyMock.expect(druidServer3.getMaxSize()).andReturn(100L).atLeastOnce();
+    EasyMock.expect(druidServer3.getSegments()).andReturn(segmentsMap3).anyTimes();
+    EasyMock.expect(druidServer3.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+    EasyMock.replay(druidServer3);
+
+    EasyMock.expect(druidServer4.getName()).andReturn("4").atLeastOnce();
+    EasyMock.expect(druidServer4.getTier()).andReturn("normal").anyTimes();
+    EasyMock.expect(druidServer4.getCurrSize()).andReturn(30L).atLeastOnce();
+    EasyMock.expect(druidServer4.getMaxSize()).andReturn(100L).atLeastOnce();
+    EasyMock.expect(druidServer4.getSegments()).andReturn(segmentsMap4).anyTimes();
+    EasyMock.expect(druidServer4.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+    EasyMock.replay(druidServer4);
+
+    EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes();
+    EasyMock.replay(holder1);
+    EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes();
+    EasyMock.replay(holder2);
+
+    EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes();
+    EasyMock.replay(holder3);
+
+    EasyMock.expect(holder4.getServer()).andReturn(druidServer4).anyTimes();
+    EasyMock.replay(holder4);
+
+    List<ServerHolder> holderList = Lists.newArrayList();
+    holderList.add(holder1);
+    holderList.add(holder2);
+    holderList.add(holder3);
+    holderList.add(holder4);
+
+    Map<DataSegment, Integer> segmentCountMap = Maps.newHashMap();
+    ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
+    for (int i = 0; i < 5000; i++) {
+      segmentCountMap.put(sampler.getRandomBalancerSegmentHolder(holderList).getSegment(), 1);
+    }
+
+    for (DataSegment segment : segments) {
+      Assert.assertEquals(segmentCountMap.get(segment), new Integer(1));
+    }
+
+
+  }
+}
diff --git a/server/src/test/java/com/metamx/druid/utils/DruidMasterBalancerProfiler.java b/server/src/test/java/com/metamx/druid/utils/DruidMasterBalancerProfiler.java
new file mode 100644
index 0000000..482274c
--- /dev/null
+++ b/server/src/test/java/com/metamx/druid/utils/DruidMasterBalancerProfiler.java
@@ -0,0 +1,249 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012  Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+*/
+
+package com.metamx.druid.utils;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.MinMaxPriorityQueue;
+import com.metamx.druid.client.DataSegment;
+import com.metamx.druid.client.DruidServer;
+import com.metamx.druid.db.DatabaseRuleManager;
+import com.metamx.druid.master.DruidCluster;
+import com.metamx.druid.master.DruidMaster;
+import com.metamx.druid.master.DruidMasterBalancerTester;
+import com.metamx.druid.master.DruidMasterRuleRunner;
+import com.metamx.druid.master.DruidMasterRuntimeParams;
+import com.metamx.druid.master.LoadPeonCallback;
+import com.metamx.druid.master.LoadQueuePeon;
+import com.metamx.druid.master.LoadQueuePeonTester;
+import com.metamx.druid.master.MasterSegmentSettings;
+import com.metamx.druid.master.ReplicationThrottler;
+import com.metamx.druid.master.SegmentReplicantLookup;
+import com.metamx.druid.master.ServerHolder;
+import com.metamx.druid.master.rules.PeriodLoadRule;
+import com.metamx.druid.master.rules.Rule;
+import com.metamx.druid.shard.NoneShardSpec;
+import com.metamx.emitter.EmittingLogger;
+import com.metamx.emitter.service.ServiceEmitter;
+import org.easymock.EasyMock;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.Before;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DruidMasterBalancerProfiler
+{
+  private static final int MAX_SEGMENTS_TO_MOVE = 5;
+  private DruidMaster master;
+  private DruidServer druidServer1;
+  private DruidServer druidServer2;
+  Map<String, DataSegment> segments = Maps.newHashMap();
+  ServiceEmitter emitter;
+  DatabaseRuleManager manager;
+  PeriodLoadRule loadRule = new PeriodLoadRule(new Period("P5000Y"),3,"normal");
+  List<Rule> rules = ImmutableList.<Rule>of(loadRule);
+
+  @Before
+  public void setUp() throws Exception
+  {
+    master = EasyMock.createMock(DruidMaster.class);
+    druidServer1 = EasyMock.createMock(DruidServer.class);
+    druidServer2 = EasyMock.createMock(DruidServer.class);
+    emitter = EasyMock.createMock(ServiceEmitter.class);
+    EmittingLogger.registerEmitter(emitter);
+    manager = EasyMock.createMock(DatabaseRuleManager.class);
+  }
+
+  public void bigProfiler()
+  {
+    Stopwatch watch = new Stopwatch();
+    int numSegments = 55000;
+    int numServers=50;
+    EasyMock.expect(manager.getAllRules()).andReturn(ImmutableMap.<String, List<Rule>>of("test", rules)).anyTimes();
+    EasyMock.expect(manager.getRules(EasyMock.<String>anyObject())).andReturn(rules).anyTimes();
+    EasyMock.expect(manager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(rules).anyTimes();
+    EasyMock.replay(manager);
+
+    master.moveSegment(
+        EasyMock.<String>anyObject(),
+        EasyMock.<String>anyObject(),
+        EasyMock.<String>anyObject(),
+        EasyMock.<LoadPeonCallback>anyObject()
+    );
+    EasyMock.expectLastCall().anyTimes();
+    EasyMock.replay(master);
+
+    List<DruidServer> serverList = Lists.newArrayList();
+    Map<String, LoadQueuePeon> peonMap = Maps.newHashMap();
+    List<ServerHolder> serverHolderList = Lists.newArrayList();
+    Map<String,DataSegment> segmentMap = Maps.newHashMap();
+    for (int i=0;i<numSegments;i++)
+    {
+      segmentMap.put(
+          "segment" + i,
+          new DataSegment(
+              "datasource" + i,
+              new Interval(new DateTime("2012-01-01"), (new DateTime("2012-01-01")).plusHours(1)),
+              (new DateTime("2012-03-01")).toString(),
+              Maps.<String, Object>newHashMap(),
+              Lists.<String>newArrayList(),
+              Lists.<String>newArrayList(),
+              new NoneShardSpec(),
+              0,
+              4L
+          )
+      );
+    }
+
+    for (int i=0;i<numServers;i++)
+    {
+      DruidServer server =EasyMock.createMock(DruidServer.class);
+      EasyMock.expect(server.getMetadata()).andReturn(null).anyTimes();
+      EasyMock.expect(server.getCurrSize()).andReturn(30L).atLeastOnce();
+      EasyMock.expect(server.getMaxSize()).andReturn(100L).atLeastOnce();
+      EasyMock.expect(server.getTier()).andReturn("normal").anyTimes();
+      EasyMock.expect(server.getName()).andReturn(Integer.toString(i)).atLeastOnce();
+      EasyMock.expect(server.getHost()).andReturn(Integer.toString(i)).anyTimes();
+      if (i==0)
+      {
+        EasyMock.expect(server.getSegments()).andReturn(segmentMap).anyTimes();
+      }
+      else
+      {
+        EasyMock.expect(server.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
+      }
+      EasyMock.expect(server.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+      EasyMock.replay(server);
+
+      LoadQueuePeon peon = new LoadQueuePeonTester();
+      peonMap.put(Integer.toString(i),peon);
+      serverHolderList.add(new ServerHolder(server, peon));
+          }
+
+    DruidMasterRuntimeParams params =
+        DruidMasterRuntimeParams.newBuilder()
+                                .withDruidCluster(
+                                    new DruidCluster(
+                                        ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
+                                            "normal",
+                                            MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
+                                                               .create(
+                                                                   serverHolderList
+                                                               )
+                                        )
+                                    )
+                                )
+                                .withLoadManagementPeons(
+                                    peonMap
+                                )
+                                .withAvailableSegments(segmentMap.values())
+                                .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
+                                .withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
+                                .withEmitter(emitter)
+                                .withDatabaseRuleManager(manager)
+                                .withReplicationManager(new ReplicationThrottler(2, 500))
+                                .withSegmentReplicantLookup(
+                                    SegmentReplicantLookup.make(new DruidCluster(
+                                        ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
+                                            "normal",
+                                            MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
+                                                               .create(
+                                                                   serverHolderList
+                                                               )
+                                        )
+                                    )
+                                    )
+                                )
+                                .build();
+
+    DruidMasterBalancerTester tester = new DruidMasterBalancerTester(master);
+    DruidMasterRuleRunner runner = new DruidMasterRuleRunner(master,500,5);
+    watch.start();
+    DruidMasterRuntimeParams balanceParams = tester.run(params);
+    DruidMasterRuntimeParams assignParams = runner.run(params);
+    System.out.println(watch.stop());
+  }
+
+
+  public void profileRun(){
+  Stopwatch watch = new Stopwatch();
+  LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
+  LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
+
+  EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
+  EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
+  EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
+  EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
+  EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+  EasyMock.replay(druidServer1);
+
+  EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce();
+  EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
+  EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
+  EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
+  EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
+  EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+  EasyMock.replay(druidServer2);
+
+  master.moveSegment(
+      EasyMock.<String>anyObject(),
+      EasyMock.<String>anyObject(),
+      EasyMock.<String>anyObject(),
+      EasyMock.<LoadPeonCallback>anyObject()
+  );
+  EasyMock.expectLastCall().anyTimes();
+  EasyMock.replay(master);
+
+  DruidMasterRuntimeParams params =
+      DruidMasterRuntimeParams.newBuilder()
+                              .withDruidCluster(
+                                  new DruidCluster(
+                                      ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
+                                          "normal",
+                                          MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
+                                                             .create(
+                                                                 Arrays.asList(
+                                                                     new ServerHolder(druidServer1, fromPeon),
+                                                                     new ServerHolder(druidServer2, toPeon)
+                                                                 )
+                                                             )
+                                      )
+                                  )
+                              )
+                              .withLoadManagementPeons(ImmutableMap.<String, LoadQueuePeon>of("from", fromPeon, "to", toPeon))
+                              .withAvailableSegments(segments.values())
+                              .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
+                              .withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
+                              .build();
+    DruidMasterBalancerTester tester = new DruidMasterBalancerTester(master);
+    watch.start();
+    DruidMasterRuntimeParams balanceParams = tester.run(params);
+    System.out.println(watch.stop());
+  }
+
+}