merged changes with master
diff --git a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java
deleted file mode 100644
index 9a22daf..0000000
--- a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package com.metamx.druid.master;
-
-import com.google.common.collect.MinMaxPriorityQueue;
-import com.metamx.common.Pair;
-import com.metamx.common.logger.Logger;
-import com.metamx.druid.client.DataSegment;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.Random;
-
-/**
- * The BalancerCostAnalyzer will compute the total initial cost of the cluster, with costs defined in
- * computeJointSegmentCosts. It will then propose to move (pseudo-)randomly chosen segments from their
- * respective initial servers to other servers, chosen greedily to minimize the cost of the cluster.
- */
-public class BalancerCostAnalyzer
-{
- private static final Logger log = new Logger(BalancerCostAnalyzer.class);
- private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
- private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS;
- private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS;
- private final Random rand;
- private final DateTime referenceTimestamp;
-
- public BalancerCostAnalyzer(DateTime referenceTimestamp)
- {
- this.referenceTimestamp = referenceTimestamp;
- rand = new Random(0);
- }
-
- /**
- * Calculates the cost normalization. This is such that the normalized cost is lower bounded
- * by 1 (e.g. when each segment gets its own compute node).
- *
- * @param serverHolders A list of ServerHolders for a particular tier.
- *
- * @return The normalization value (the sum of the diagonal entries in the
- * pairwise cost matrix). This is the cost of a cluster if each
- * segment were to get its own compute node.
- */
- public double calculateNormalization(final List<ServerHolder> serverHolders)
- {
- double cost = 0;
- for (ServerHolder server : serverHolders) {
- for (DataSegment segment : server.getServer().getSegments().values()) {
- cost += computeJointSegmentCosts(segment, segment);
- }
- }
- return cost;
- }
-
- /**
- * Calculates the initial cost of the Druid segment configuration.
- *
- * @param serverHolders A list of ServerHolders for a particular tier.
- *
- * @return The initial cost of the Druid tier.
- */
- public double calculateInitialTotalCost(final List<ServerHolder> serverHolders)
- {
- double cost = 0;
- for (ServerHolder server : serverHolders) {
- DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
- for (int i = 0; i < segments.length; ++i) {
- for (int j = i; j < segments.length; ++j) {
- cost += computeJointSegmentCosts(segments[i], segments[j]);
- }
- }
- }
- return cost;
- }
-
- /**
- * This defines the unnormalized cost function between two segments. There is a base cost given by
- * the minimum size of the two segments and additional penalties.
- * recencyPenalty: it is more likely that recent segments will be queried together
- * dataSourcePenalty: if two segments belong to the same data source, they are more likely to be involved
- * in the same queries
- * gapPenalty: it is more likely that segments close together in time will be queried together
- *
- * @param segment1 The first DataSegment.
- * @param segment2 The second DataSegment.
- *
- * @return The joint cost of placing the two DataSegments together on one node.
- */
- public double computeJointSegmentCosts(final DataSegment segment1, final DataSegment segment2)
- {
- final Interval gap = segment1.getInterval().gap(segment2.getInterval());
-
- final double baseCost = Math.min(segment1.getSize(), segment2.getSize());
- double recencyPenalty = 1;
- double dataSourcePenalty = 1;
- double gapPenalty = 1;
-
- if (segment1.getDataSource().equals(segment2.getDataSource())) {
- dataSourcePenalty = 2;
- }
-
- double maxDiff = Math.max(
- referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(),
- referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis()
- );
- if (maxDiff < SEVEN_DAYS_IN_MILLIS) {
- recencyPenalty = 2 - maxDiff / SEVEN_DAYS_IN_MILLIS;
- }
-
- /** gap is null if the two segment intervals overlap or if they're adjacent */
- if (gap == null) {
- gapPenalty = 2;
- } else {
- long gapMillis = gap.toDurationMillis();
- if (gapMillis < THIRTY_DAYS_IN_MILLIS) {
- gapPenalty = 2 - gapMillis / THIRTY_DAYS_IN_MILLIS;
- }
- }
-
- final double cost = baseCost * recencyPenalty * dataSourcePenalty * gapPenalty;
-
- return cost;
- }
-
- /**
- * The balancing application requires us to pick a proposal segment uniformly at random from the set of
- * all servers. We use reservoir sampling to do this.
- *
- * @param serverHolders A list of ServerHolders for a particular tier.
- *
- * @return A BalancerSegmentHolder sampled uniformly at random.
- */
- public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders)
- {
- ServerHolder fromServerHolder = null;
- DataSegment proposalSegment = null;
- int numSoFar = 0;
-
- for (ServerHolder server : serverHolders) {
- for (DataSegment segment : server.getServer().getSegments().values()) {
- int randNum = rand.nextInt(numSoFar + 1);
- // w.p. 1 / (numSoFar + 1), swap out the server and segment
- if (randNum == numSoFar) {
- fromServerHolder = server;
- proposalSegment = segment;
- numSoFar++;
- }
- }
- }
-
- return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment);
- }
-
- /**
- * For balancing, we want to only make a move if the minimum cost server is not already serving the segment.
- *
- * @param proposalSegment A DataSegment that we are proposing to move.
- * @param serverHolders An iterable of ServerHolders for a particular tier.
- *
- * @return A ServerHolder with the new home for a segment.
- */
- public ServerHolder findNewSegmentHomeBalance(
- final DataSegment proposalSegment,
- final Iterable<ServerHolder> serverHolders
- )
- {
- MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = computeCosts(proposalSegment, serverHolders);
- if (costsAndServers.isEmpty()) {
- return null;
- }
-
- ServerHolder toServer = costsAndServers.pollFirst().rhs;
- if (!toServer.isServingSegment(proposalSegment)) {
- return toServer;
- }
-
- return null;
- }
-
- /**
- * For assignment, we want to move to the lowest cost server that isn't already serving the segment.
- *
- * @param proposalSegment A DataSegment that we are proposing to move.
- * @param serverHolders An iterable of ServerHolders for a particular tier.
- *
- * @return A ServerHolder with the new home for a segment.
- */
- public ServerHolder findNewSegmentHomeAssign(
- final DataSegment proposalSegment,
- final Iterable<ServerHolder> serverHolders
- )
- {
- MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = computeCosts(proposalSegment, serverHolders);
- while (!costsAndServers.isEmpty()) {
- ServerHolder toServer = costsAndServers.pollFirst().rhs;
- if (!toServer.isServingSegment(proposalSegment)) {
- return toServer;
- }
- }
-
- return null;
- }
-
- private MinMaxPriorityQueue<Pair<Double, ServerHolder>> computeCosts(
- final DataSegment proposalSegment,
- final Iterable<ServerHolder> serverHolders
- )
- {
- MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = MinMaxPriorityQueue.orderedBy(
- new Comparator<Pair<Double, ServerHolder>>()
- {
- @Override
- public int compare(
- Pair<Double, ServerHolder> o,
- Pair<Double, ServerHolder> o1
- )
- {
- return Double.compare(o.lhs, o1.lhs);
- }
- }
- ).create();
-
- final long proposalSegmentSize = proposalSegment.getSize();
-
- for (ServerHolder server : serverHolders) {
- /** Don't calculate cost if the server doesn't have enough space or is loading the segment */
- if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
- continue;
- }
-
- /** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
- double cost = 0f;
- /** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
- for (DataSegment segment : server.getServer().getSegments().values()) {
- if (!proposalSegment.equals(segment)) {
- cost += computeJointSegmentCosts(proposalSegment, segment);
- }
- }
- /** plus the costs of segments that will be loaded */
- for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
- cost += computeJointSegmentCosts(proposalSegment, segment);
- }
-
- costsAndServers.add(Pair.of(cost, server));
- }
-
- return costsAndServers;
- }
-
-}
diff --git a/server/src/main/java/com/metamx/druid/master/BalancerStrategy.java b/server/src/main/java/com/metamx/druid/master/BalancerStrategy.java
new file mode 100644
index 0000000..8494411
--- /dev/null
+++ b/server/src/main/java/com/metamx/druid/master/BalancerStrategy.java
@@ -0,0 +1,35 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012 Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+*/
+
+package com.metamx.druid.master;
+
+import com.metamx.druid.client.DataSegment;
+
+import java.util.List;
+
+public interface BalancerStrategy
+{
+ public ServerHolder findNewSegmentHomeBalancer(final DataSegment proposalSegment, final List<ServerHolder> serverHolders);
+
+ public ServerHolder findNewSegmentHomeReplicator(final DataSegment proposalSegment, final List<ServerHolder> serverHolders);
+
+ public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders);
+
+ public void emitStats(String tier, MasterStats stats, List<ServerHolder> serverHolderList);
+}
diff --git a/server/src/main/java/com/metamx/druid/master/BalancerStrategyFactory.java b/server/src/main/java/com/metamx/druid/master/BalancerStrategyFactory.java
new file mode 100644
index 0000000..982d23c
--- /dev/null
+++ b/server/src/main/java/com/metamx/druid/master/BalancerStrategyFactory.java
@@ -0,0 +1,26 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012 Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+*/
+package com.metamx.druid.master;
+
+import org.joda.time.DateTime;
+
+public interface BalancerStrategyFactory
+{
+ public BalancerStrategy getBalancerStrategy(DateTime referenceTimestamp);
+}
diff --git a/server/src/main/java/com/metamx/druid/master/CostBalancerStrategy.java b/server/src/main/java/com/metamx/druid/master/CostBalancerStrategy.java
new file mode 100644
index 0000000..47f7fb1
--- /dev/null
+++ b/server/src/main/java/com/metamx/druid/master/CostBalancerStrategy.java
@@ -0,0 +1,256 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012 Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+*/
+
+package com.metamx.druid.master;
+
+import com.google.common.collect.MinMaxPriorityQueue;
+import com.metamx.common.Pair;
+import com.metamx.druid.client.DataSegment;
+import com.metamx.emitter.EmittingLogger;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.Comparator;
+import java.util.List;
+
+public class CostBalancerStrategy implements BalancerStrategy
+{
+ private static final EmittingLogger log = new EmittingLogger(CostBalancerStrategy.class);
+ private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
+ private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS;
+ private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS;
+ private final DateTime referenceTimestamp;
+
+ public CostBalancerStrategy(DateTime referenceTimestamp)
+ {
+ this.referenceTimestamp = referenceTimestamp;
+ }
+
+ @Override
+ public ServerHolder findNewSegmentHomeReplicator(
+ DataSegment proposalSegment, List<ServerHolder> serverHolders
+ )
+ {
+ ServerHolder holder= chooseBestServer(proposalSegment, serverHolders, false).rhs;
+ if (holder!=null && !holder.isServingSegment(proposalSegment))
+ {
+ return holder;
+ }
+ return null;
+ }
+
+
+ @Override
+ public ServerHolder findNewSegmentHomeBalancer(
+ DataSegment proposalSegment, List<ServerHolder> serverHolders
+ )
+ {
+ return chooseBestServer(proposalSegment, serverHolders, true).rhs;
+ }
+
+
+
+ /**
+ * For assignment, we want to move to the lowest cost server that isn't already serving the segment.
+ *
+ * @param proposalSegment A DataSegment that we are proposing to move.
+ * @param serverHolders An iterable of ServerHolders for a particular tier.
+ *
+ * @return A ServerHolder with the new home for a segment.
+ */
+
+ private Pair<Double, ServerHolder> chooseBestServer(
+ final DataSegment proposalSegment,
+ final Iterable<ServerHolder> serverHolders,
+ boolean includeCurrentServer
+ )
+ {
+
+ Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
+ MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = MinMaxPriorityQueue.orderedBy(
+ new Comparator<Pair<Double, ServerHolder>>()
+ {
+ @Override
+ public int compare(
+ Pair<Double, ServerHolder> o,
+ Pair<Double, ServerHolder> o1
+ )
+ {
+ return Double.compare(o.lhs, o1.lhs);
+ }
+ }
+ ).create();
+
+ final long proposalSegmentSize = proposalSegment.getSize();
+
+ for (ServerHolder server : serverHolders) {
+ if (includeCurrentServer || !server.isServingSegment(proposalSegment))
+ {
+ /** Don't calculate cost if the server doesn't have enough space or is loading the segment */
+ if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
+ continue;
+ }
+
+ /** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
+ double cost = 0f;
+ /** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
+ for (DataSegment segment : server.getServer().getSegments().values()) {
+ if (!proposalSegment.equals(segment)) {
+ cost += computeJointSegmentCosts(proposalSegment, segment);
+ }
+ }
+ /** plus the costs of segments that will be loaded */
+ for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
+ cost += computeJointSegmentCosts(proposalSegment, segment);
+ }
+
+ if (cost < bestServer.lhs) {
+ bestServer = Pair.of(cost, server);
+ }
+ }
+ }
+
+ return bestServer;
+ }
+
+ /**
+ * This defines the unnormalized cost function between two segments. There is a base cost given by
+ * the minimum size of the two segments and additional penalties.
+ * recencyPenalty: it is more likely that recent segments will be queried together
+ * dataSourcePenalty: if two segments belong to the same data source, they are more likely to be involved
+ * in the same queries
+ * gapPenalty: it is more likely that segments close together in time will be queried together
+ *
+ * @param segment1 The first DataSegment.
+ * @param segment2 The second DataSegment.
+ *
+ * @return The joint cost of placing the two DataSegments together on one node.
+ */
+ public double computeJointSegmentCosts(final DataSegment segment1, final DataSegment segment2)
+ {
+ final Interval gap = segment1.getInterval().gap(segment2.getInterval());
+
+ final double baseCost = Math.min(segment1.getSize(), segment2.getSize());
+ double recencyPenalty = 1;
+ double dataSourcePenalty = 1;
+ double gapPenalty = 1;
+
+ if (segment1.getDataSource().equals(segment2.getDataSource())) {
+ dataSourcePenalty = 2;
+ }
+
+ double maxDiff = Math.max(
+ referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(),
+ referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis()
+ );
+ double segment1diff=referenceTimestamp.getMillis()-segment1.getInterval().getEndMillis();
+ double segment2diff=referenceTimestamp.getMillis()-segment2.getInterval().getEndMillis();
+ if (segment1diff < SEVEN_DAYS_IN_MILLIS && segment2diff <SEVEN_DAYS_IN_MILLIS) {
+ recencyPenalty = (2 - segment1diff / SEVEN_DAYS_IN_MILLIS)*(2-segment2diff /SEVEN_DAYS_IN_MILLIS);
+ }
+
+ /** gap is null if the two segment intervals overlap or if they're adjacent */
+ if (gap == null) {
+ gapPenalty = 2;
+ } else {
+ long gapMillis = gap.toDurationMillis();
+ if (gapMillis < THIRTY_DAYS_IN_MILLIS) {
+ gapPenalty = 2 - gapMillis / THIRTY_DAYS_IN_MILLIS;
+ }
+ }
+
+ final double cost = baseCost * recencyPenalty * dataSourcePenalty * gapPenalty;
+
+ return cost;
+ }
+
+ public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders)
+ {
+ ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
+ return sampler.getRandomBalancerSegmentHolder(serverHolders);
+ }
+
+ /**
+ * Calculates the initial cost of the Druid segment configuration.
+ *
+ * @param serverHolders A list of ServerHolders for a particular tier.
+ *
+ * @return The initial cost of the Druid tier.
+ */
+ public double calculateInitialTotalCost(final List<ServerHolder> serverHolders)
+ {
+ double cost = 0;
+ for (ServerHolder server : serverHolders) {
+ DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
+ for (int i = 0; i < segments.length; ++i) {
+ for (int j = i; j < segments.length; ++j) {
+ cost += computeJointSegmentCosts(segments[i], segments[j]);
+ }
+ }
+ }
+ return cost;
+ }
+
+ /**
+ * Calculates the cost normalization. This is such that the normalized cost is lower bounded
+ * by 1 (e.g. when each segment gets its own compute node).
+ *
+ * @param serverHolders A list of ServerHolders for a particular tier.
+ *
+ * @return The normalization value (the sum of the diagonal entries in the
+ * pairwise cost matrix). This is the cost of a cluster if each
+ * segment were to get its own compute node.
+ */
+ public double calculateNormalization(final List<ServerHolder> serverHolders)
+ {
+ double cost = 0;
+ for (ServerHolder server : serverHolders) {
+ for (DataSegment segment : server.getServer().getSegments().values()) {
+ cost += computeJointSegmentCosts(segment, segment);
+ }
+ }
+ return cost;
+ }
+
+ @Override
+ public void emitStats(
+ String tier,
+ MasterStats stats, List<ServerHolder> serverHolderList
+ )
+ {
+ final double initialTotalCost = calculateInitialTotalCost(serverHolderList);
+ final double normalization = calculateNormalization(serverHolderList);
+ final double normalizedInitialCost = initialTotalCost / normalization;
+
+ stats.addToTieredStat("initialCost", tier, (long) initialTotalCost);
+ stats.addToTieredStat("normalization", tier, (long) normalization);
+ stats.addToTieredStat("normalizedInitialCostTimesOneThousand", tier, (long) (normalizedInitialCost * 1000));
+
+ log.info(
+ "[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f]",
+ tier,
+ initialTotalCost,
+ normalization,
+ normalizedInitialCost
+ );
+
+ }
+
+
+}
\ No newline at end of file
diff --git a/server/src/main/java/com/metamx/druid/master/CostBalancerStrategyFactory.java b/server/src/main/java/com/metamx/druid/master/CostBalancerStrategyFactory.java
new file mode 100644
index 0000000..5acb634
--- /dev/null
+++ b/server/src/main/java/com/metamx/druid/master/CostBalancerStrategyFactory.java
@@ -0,0 +1,31 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012 Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+*/
+package com.metamx.druid.master;
+
+import org.joda.time.DateTime;
+
+public class CostBalancerStrategyFactory implements BalancerStrategyFactory
+{
+
+ @Override
+ public BalancerStrategy getBalancerStrategy(DateTime referenceTimestamp)
+ {
+ return new CostBalancerStrategy(referenceTimestamp);
+ }
+}
diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java
index 6a019a6..1303dbd 100644
--- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java
+++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java
@@ -92,7 +92,6 @@
private final IndexingServiceClient indexingServiceClient;
private final ScheduledExecutorService exec;
private final LoadQueueTaskMaster taskMaster;
-
private final Map<String, LoadQueuePeon> loadManagementPeons;
private final AtomicReference<LeaderLatch> leaderLatch;
private volatile AtomicReference<MasterSegmentSettings> segmentSettingsAtomicReference;
diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java
index d1a96a0..889383a 100644
--- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java
+++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java
@@ -78,7 +78,7 @@
{
final MasterStats stats = new MasterStats();
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
- final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
+ final BalancerStrategy strategy = params.getBalancerStrategyFactory().getBalancerStrategy(referenceTimestamp);
final int maxSegmentsToMove = params.getMasterSegmentSettings().getMaxSegmentsToMove();
for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry :
@@ -113,34 +113,25 @@
}
for (int iter = 0; iter < maxSegmentsToMove; iter++) {
- final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList);
+ final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList);
- if (params.getAvailableSegments().contains(segmentToMove.getSegment())) {
- final ServerHolder holder = analyzer.findNewSegmentHomeBalance(segmentToMove.getSegment(), serverHolderList);
+ if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
+ final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList);
if (holder != null) {
moveSegment(segmentToMove, holder.getServer(), params);
}
}
}
-
- final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList);
- final double normalization = analyzer.calculateNormalization(serverHolderList);
- final double normalizedInitialCost = initialTotalCost / normalization;
-
- stats.addToTieredStat("initialCost", tier, (long) initialTotalCost);
- stats.addToTieredStat("normalization", tier, (long) normalization);
- stats.addToTieredStat("normalizedInitialCostTimesOneThousand", tier, (long) (normalizedInitialCost * 1000));
stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
+ if (params.getMasterSegmentSettings().isEmitBalancingStats()) {
+ strategy.emitStats(tier, stats, serverHolderList);
+ }
log.info(
- "[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f], Segments Moved: [%d]",
- tier,
- initialTotalCost,
- normalization,
- normalizedInitialCost,
- currentlyMovingSegments.get(tier).size()
+ "[%s]: Segments Moved: [%d]", tier, currentlyMovingSegments.get(tier).size()
);
+
}
return params.buildFromExisting()
diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java
index d514b4d..95b5c8d 100644
--- a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java
+++ b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java
@@ -42,12 +42,6 @@
@Default("PT1800s")
public abstract Duration getMasterSegmentMergerPeriod();
- @Config("druid.master.millisToWaitBeforeDeleting")
- public long getMillisToWaitBeforeDeleting()
- {
- return 15 * 60 * 1000L;
- }
-
@Config("druid.master.merger.on")
public boolean isMergeSegments()
{
@@ -66,26 +60,16 @@
return null;
}
- @Config("druid.master.merge.threshold")
- public long getMergeBytesLimit()
- {
- return 100000000L;
- }
-
- @Config("druid.master.merge.maxSegments")
- public int getMergeSegmentsLimit()
- {
- return Integer.MAX_VALUE;
- }
-
- @Config("druid.master.balancer.maxSegmentsToMove")
- @Default("5")
- public abstract int getMaxSegmentsToMove();
-
@Config("druid.master.replicant.lifetime")
@Default("15")
public abstract int getReplicantLifetime();
+ @Config("druid.master.masterSegmentSettings")
+ public MasterSegmentSettings getMasterSegmentSettings()
+ {
+ return new MasterSegmentSettings.Builder().build();
+ }
+
@Config("druid.master.replicant.throttleLimit")
@Default("10")
public abstract int getReplicantThrottleLimit();
diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java b/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java
index 2bd4870..04a3ce5 100644
--- a/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java
+++ b/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java
@@ -68,7 +68,6 @@
DatabaseRuleManager databaseRuleManager = paramsWithReplicationManager.getDatabaseRuleManager();
for (DataSegment segment : paramsWithReplicationManager.getAvailableSegments()) {
List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
-
boolean foundMatchingRule = false;
for (Rule rule : rules) {
if (rule.appliesTo(segment, now)) {
diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java b/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java
index 3c76b58..4ad153c 100644
--- a/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java
+++ b/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java
@@ -49,6 +49,7 @@
private final MasterSegmentSettings masterSegmentSettings;
private final MasterStats stats;
private final DateTime balancerReferenceTimestamp;
+ private final BalancerStrategyFactory strategyFactory;
public DruidMasterRuntimeParams(
long startTime,
@@ -62,7 +63,8 @@
ServiceEmitter emitter,
MasterSegmentSettings masterSegmentSettings,
MasterStats stats,
- DateTime balancerReferenceTimestamp
+ DateTime balancerReferenceTimestamp,
+ BalancerStrategyFactory strategyFactory
)
{
this.startTime = startTime;
@@ -77,6 +79,7 @@
this.masterSegmentSettings = masterSegmentSettings;
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
+ this.strategyFactory = strategyFactory;
}
public long getStartTime()
@@ -139,9 +142,9 @@
return balancerReferenceTimestamp;
}
- public BalancerCostAnalyzer getBalancerCostAnalyzer(DateTime referenceTimestamp)
+ public BalancerStrategyFactory getBalancerStrategyFactory()
{
- return new BalancerCostAnalyzer(referenceTimestamp);
+ return strategyFactory;
}
public boolean hasDeletionWaitTimeElapsed()
@@ -168,7 +171,8 @@
emitter,
masterSegmentSettings,
stats,
- balancerReferenceTimestamp
+ balancerReferenceTimestamp,
+ strategyFactory
);
}
@@ -186,6 +190,8 @@
private MasterSegmentSettings masterSegmentSettings;
private MasterStats stats;
private DateTime balancerReferenceTimestamp;
+ private boolean emitBalancingCostParams;
+ private BalancerStrategyFactory strategyFactory;
Builder()
{
@@ -201,6 +207,8 @@
this.stats = new MasterStats();
this.masterSegmentSettings = new MasterSegmentSettings.Builder().build();
this.balancerReferenceTimestamp = null;
+ this.emitBalancingCostParams = false;
+ this.strategyFactory = new CostBalancerStrategyFactory();
}
Builder(
@@ -215,7 +223,8 @@
ServiceEmitter emitter,
MasterSegmentSettings masterSegmentSettings,
MasterStats stats,
- DateTime balancerReferenceTimestamp
+ DateTime balancerReferenceTimestamp,
+ BalancerStrategyFactory strategyFactory
)
{
this.startTime = startTime;
@@ -230,6 +239,8 @@
this.masterSegmentSettings = masterSegmentSettings;
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
+ this.emitBalancingCostParams = emitBalancingCostParams;
+ this.strategyFactory=strategyFactory;
}
public DruidMasterRuntimeParams build()
@@ -246,10 +257,23 @@
emitter,
masterSegmentSettings,
stats,
- balancerReferenceTimestamp
+ balancerReferenceTimestamp,
+ strategyFactory
);
}
+ public Builder withBalancerStrategy(BalancerStrategyFactory strategyFactory)
+ {
+ this.strategyFactory = strategyFactory;
+ return this;
+ }
+
+ public Builder withEmitBalancingCostParams(boolean param)
+ {
+ emitBalancingCostParams = param;
+ return this;
+ }
+
public Builder withStartTime(long time)
{
startTime = time;
@@ -321,5 +345,11 @@
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
return this;
}
+
+ public Builder withBalancerStrategyFactory(BalancerStrategyFactory strategyFactory)
+ {
+ this.strategyFactory=strategyFactory;
+ return this;
+ }
}
}
diff --git a/server/src/main/java/com/metamx/druid/master/MasterSegmentSettings.java b/server/src/main/java/com/metamx/druid/master/MasterSegmentSettings.java
index cd6502b..75a56fa 100644
--- a/server/src/main/java/com/metamx/druid/master/MasterSegmentSettings.java
+++ b/server/src/main/java/com/metamx/druid/master/MasterSegmentSettings.java
@@ -28,19 +28,22 @@
private long mergeBytesLimit= 100000000L;
private int mergeSegmentsLimit = Integer.MAX_VALUE;
private int maxSegmentsToMove = 5;
+ private boolean emitBalancingStats = false;
@JsonCreator
public MasterSegmentSettings(
@JsonProperty("millisToWaitBeforeDeleting") Long millisToWaitBeforeDeleting,
@JsonProperty("mergeBytesLimit") Long mergeBytesLimit,
@JsonProperty("mergeSegmentsLimit") Integer mergeSegmentsLimit,
- @JsonProperty("maxSegmentsToMove") Integer maxSegmentsToMove
+ @JsonProperty("maxSegmentsToMove") Integer maxSegmentsToMove,
+ @JsonProperty("emitBalancingStats") Boolean emitBalancingStats
)
{
this.maxSegmentsToMove=maxSegmentsToMove;
this.millisToWaitBeforeDeleting=millisToWaitBeforeDeleting;
this.mergeSegmentsLimit=mergeSegmentsLimit;
this.mergeBytesLimit=mergeBytesLimit;
+ this.emitBalancingStats = emitBalancingStats;
}
public static String getConfigKey()
@@ -60,6 +63,11 @@
return mergeBytesLimit;
}
+ public boolean isEmitBalancingStats()
+ {
+ return emitBalancingStats;
+ }
+
@JsonProperty
public int getMergeSegmentsLimit()
{
@@ -80,6 +88,7 @@
private long mergeBytesLimit;
private int mergeSegmentsLimit;
private int maxSegmentsToMove;
+ private boolean emitBalancingStats;
public Builder()
{
@@ -87,14 +96,16 @@
this.mergeBytesLimit= 100000000L;
this.mergeSegmentsLimit= Integer.MAX_VALUE;
this.maxSegmentsToMove = 5;
+ this.emitBalancingStats = false;
}
- public Builder(long millisToWaitBeforeDeleting, long mergeBytesLimit, int mergeSegmentsLimit, int maxSegmentsToMove)
+ public Builder(long millisToWaitBeforeDeleting, long mergeBytesLimit, int mergeSegmentsLimit, int maxSegmentsToMove, boolean emitBalancingStats)
{
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.maxSegmentsToMove = maxSegmentsToMove;
+ this.emitBalancingStats = emitBalancingStats;
}
public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting)
@@ -123,7 +134,7 @@
public MasterSegmentSettings build()
{
- return new MasterSegmentSettings(millisToWaitBeforeDeleting,mergeBytesLimit,mergeSegmentsLimit,maxSegmentsToMove);
+ return new MasterSegmentSettings(millisToWaitBeforeDeleting,mergeBytesLimit,mergeSegmentsLimit,maxSegmentsToMove, emitBalancingStats);
}
}
}
diff --git a/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategy.java b/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategy.java
new file mode 100644
index 0000000..d953b69
--- /dev/null
+++ b/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategy.java
@@ -0,0 +1,71 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012 Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+*/
+
+package com.metamx.druid.master;
+
+import com.metamx.druid.client.DataSegment;
+
+import java.util.List;
+import java.util.Random;
+
+public class RandomBalancerStrategy implements BalancerStrategy
+{
+ private final ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
+
+ @Override
+ public ServerHolder findNewSegmentHomeReplicator(
+ DataSegment proposalSegment, List<ServerHolder> serverHolders
+ )
+ {
+ if (serverHolders.size()==1)
+ {
+ return null;
+ }
+ else
+ {
+ ServerHolder holder = serverHolders.get(new Random().nextInt(serverHolders.size()));
+ while (holder.isServingSegment(proposalSegment))
+ {
+ holder = serverHolders.get(new Random().nextInt(serverHolders.size()));
+ }
+ return holder;
+ }
+ }
+
+ @Override
+ public ServerHolder findNewSegmentHomeBalancer(
+ DataSegment proposalSegment, List<ServerHolder> serverHolders
+ )
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders)
+ {
+ return sampler.getRandomBalancerSegmentHolder(serverHolders);
+ }
+
+ @Override
+ public void emitStats(
+ String tier, MasterStats stats, List<ServerHolder> serverHolderList
+ )
+ {
+ }
+}
diff --git a/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategyFactory.java b/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategyFactory.java
new file mode 100644
index 0000000..0cce3a1
--- /dev/null
+++ b/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategyFactory.java
@@ -0,0 +1,30 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012 Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+*/
+package com.metamx.druid.master;
+
+import org.joda.time.DateTime;
+
+public class RandomBalancerStrategyFactory implements BalancerStrategyFactory
+{
+ @Override
+ public BalancerStrategy getBalancerStrategy(DateTime referenceTimestamp)
+ {
+ return new RandomBalancerStrategy();
+ }
+}
diff --git a/server/src/main/java/com/metamx/druid/master/ReservoirSegmentSampler.java b/server/src/main/java/com/metamx/druid/master/ReservoirSegmentSampler.java
new file mode 100644
index 0000000..4db994b
--- /dev/null
+++ b/server/src/main/java/com/metamx/druid/master/ReservoirSegmentSampler.java
@@ -0,0 +1,54 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012 Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+*/
+
+package com.metamx.druid.master;
+
+import com.metamx.druid.client.DataSegment;
+
+import java.util.List;
+import java.util.Random;
+
+public class ReservoirSegmentSampler
+{
+
+ public BalancerSegmentHolder getRandomBalancerSegmentHolder(final List<ServerHolder> serverHolders)
+ {
+ final Random rand = new Random();
+ ServerHolder fromServerHolder = null;
+ DataSegment proposalSegment = null;
+ int numSoFar = 0;
+
+ for (ServerHolder server : serverHolders) {
+ for (DataSegment segment : server.getServer().getSegments().values()) {
+ int randNum = rand.nextInt(numSoFar + 1);
+ // w.p. 1 / (numSoFar+1), swap out the server and segment
+ if (randNum == numSoFar) {
+ fromServerHolder = server;
+ proposalSegment = segment;
+ }
+ numSoFar++;
+ }
+ }
+ if (fromServerHolder != null) {
+ return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment);
+ } else {
+ return null;
+ }
+ }
+}
diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
index 8a9a014..64aa3df 100644
--- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
+++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
@@ -22,7 +22,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.druid.client.DataSegment;
-import com.metamx.druid.master.BalancerCostAnalyzer;
+import com.metamx.druid.master.BalancerStrategy;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterRuntimeParams;
import com.metamx.druid.master.LoadPeonCallback;
@@ -60,15 +60,14 @@
final List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(serverQueue);
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
- final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
-
+ final BalancerStrategy strategy = params.getBalancerStrategyFactory().getBalancerStrategy(referenceTimestamp);
if (params.getAvailableSegments().contains(segment)) {
stats.accumulate(
assign(
params.getReplicationManager(),
expectedReplicants,
totalReplicants,
- analyzer,
+ strategy,
serverHolderList,
segment
)
@@ -84,7 +83,7 @@
final ReplicationThrottler replicationManager,
final int expectedReplicants,
int totalReplicants,
- final BalancerCostAnalyzer analyzer,
+ final BalancerStrategy strategy,
final List<ServerHolder> serverHolderList,
final DataSegment segment
)
@@ -98,7 +97,7 @@
break;
}
- final ServerHolder holder = analyzer.findNewSegmentHomeAssign(segment, serverHolderList);
+ final ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
if (holder == null) {
log.warn(
diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java
index 4f59483..8513c15 100644
--- a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java
+++ b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java
@@ -132,6 +132,78 @@
EasyMock.verify(druidServer4);
}
+
+ @Test
+ public void testMoveToEmptyServerBalancer()
+ {
+ EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
+ EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
+ EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
+ EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
+ EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+ EasyMock.replay(druidServer1);
+
+ EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce();
+ EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
+ EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
+ EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
+ EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
+ EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+ EasyMock.replay(druidServer2);
+
+ EasyMock.replay(druidServer3);
+ EasyMock.replay(druidServer4);
+
+ // Mock stuff that the master needs
+ master.moveSegment(
+ EasyMock.<String>anyObject(),
+ EasyMock.<String>anyObject(),
+ EasyMock.<String>anyObject(),
+ EasyMock.<LoadPeonCallback>anyObject()
+ );
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.replay(master);
+
+ LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
+ LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
+
+ DruidMasterRuntimeParams params =
+ DruidMasterRuntimeParams.newBuilder()
+ .withDruidCluster(
+ new DruidCluster(
+ ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
+ "normal",
+ MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
+ .create(
+ Arrays.asList(
+ new ServerHolder(druidServer1, fromPeon),
+ new ServerHolder(druidServer2, toPeon)
+ )
+ )
+ )
+ )
+ )
+ .withLoadManagementPeons(
+ ImmutableMap.<String, LoadQueuePeon>of(
+ "from",
+ fromPeon,
+ "to",
+ toPeon
+ )
+ )
+ .withAvailableSegments(segments.values())
+ .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
+ .withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
+ .build();
+
+ params = new DruidMasterBalancerTester(master).run(params);
+ Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
+ Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() < segments.size());
+ }
+
+
+
+
@Test
public void testRun1()
{
@@ -203,7 +275,8 @@
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
}
- @Test
+
+ @Test
public void testRun2()
{
// Mock some servers of different usages
@@ -295,4 +368,5 @@
params = new DruidMasterBalancerTester(master).run(params);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
}
+
}
diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java
index 213e350..59b727d 100644
--- a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java
+++ b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java
@@ -36,7 +36,6 @@
import org.junit.Test;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
/**
*/
@@ -96,21 +95,15 @@
}
@Override
- public long getMillisToWaitBeforeDeleting()
- {
- return super.getMillisToWaitBeforeDeleting();
- }
-
- @Override
public String getMergerServiceName()
{
return "";
}
@Override
- public int getMaxSegmentsToMove()
+ public MasterSegmentSettings getMasterSegmentSettings()
{
- return 0;
+ return new MasterSegmentSettings.Builder().withMillisToWaitBeforeDeleting(super.getMasterSegmentSettings().getMillisToWaitBeforeDeleting()).withMaxSegmentsToMove(0).build();
}
@Override
diff --git a/server/src/test/java/com/metamx/druid/master/ReservoirSegmentSamplerTest.java b/server/src/test/java/com/metamx/druid/master/ReservoirSegmentSamplerTest.java
new file mode 100644
index 0000000..7b768e1
--- /dev/null
+++ b/server/src/test/java/com/metamx/druid/master/ReservoirSegmentSamplerTest.java
@@ -0,0 +1,208 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012 Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+*/
+package com.metamx.druid.master;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.metamx.druid.client.DataSegment;
+import com.metamx.druid.client.DruidServer;
+import com.metamx.druid.shard.NoneShardSpec;
+import junit.framework.Assert;
+import org.easymock.EasyMock;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+public class ReservoirSegmentSamplerTest
+{
+ private DruidServer druidServer1;
+ private DruidServer druidServer2;
+ private DruidServer druidServer3;
+ private DruidServer druidServer4;
+
+ private ServerHolder holder1;
+ private ServerHolder holder2;
+ private ServerHolder holder3;
+ private ServerHolder holder4;
+
+ private DataSegment segment1;
+ private DataSegment segment2;
+ private DataSegment segment3;
+ private DataSegment segment4;
+ Map<String, DataSegment> segmentsMap1;
+ Map<String, DataSegment> segmentsMap2;
+ Map<String, DataSegment> segmentsMap3;
+ Map<String, DataSegment> segmentsMap4;
+ List<DataSegment> segments;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ druidServer1 = EasyMock.createMock(DruidServer.class);
+ druidServer1 = EasyMock.createMock(DruidServer.class);
+ druidServer2 = EasyMock.createMock(DruidServer.class);
+ druidServer3 = EasyMock.createMock(DruidServer.class);
+ druidServer4 = EasyMock.createMock(DruidServer.class);
+ holder1 = EasyMock.createMock(ServerHolder.class);
+ holder2 = EasyMock.createMock(ServerHolder.class);
+ holder3 = EasyMock.createMock(ServerHolder.class);
+ holder4 = EasyMock.createMock(ServerHolder.class);
+ segment1 = EasyMock.createMock(DataSegment.class);
+ segment2 = EasyMock.createMock(DataSegment.class);
+ segment3 = EasyMock.createMock(DataSegment.class);
+ segment4 = EasyMock.createMock(DataSegment.class);
+
+ DateTime start1 = new DateTime("2012-01-01");
+ DateTime start2 = new DateTime("2012-02-01");
+ DateTime version = new DateTime("2012-03-01");
+ segment1 = new DataSegment(
+ "datasource1",
+ new Interval(start1, start1.plusHours(1)),
+ version.toString(),
+ Maps.<String, Object>newHashMap(),
+ Lists.<String>newArrayList(),
+ Lists.<String>newArrayList(),
+ new NoneShardSpec(),
+ 0,
+ 11L
+ );
+ segment2 = new DataSegment(
+ "datasource1",
+ new Interval(start2, start2.plusHours(1)),
+ version.toString(),
+ Maps.<String, Object>newHashMap(),
+ Lists.<String>newArrayList(),
+ Lists.<String>newArrayList(),
+ new NoneShardSpec(),
+ 0,
+ 7L
+ );
+ segment3 = new DataSegment(
+ "datasource2",
+ new Interval(start1, start1.plusHours(1)),
+ version.toString(),
+ Maps.<String, Object>newHashMap(),
+ Lists.<String>newArrayList(),
+ Lists.<String>newArrayList(),
+ new NoneShardSpec(),
+ 0,
+ 4L
+ );
+ segment4 = new DataSegment(
+ "datasource2",
+ new Interval(start2, start2.plusHours(1)),
+ version.toString(),
+ Maps.<String, Object>newHashMap(),
+ Lists.<String>newArrayList(),
+ Lists.<String>newArrayList(),
+ new NoneShardSpec(),
+ 0,
+ 8L
+ );
+
+ segments = Lists.newArrayList(segment1, segment2, segment3, segment4);
+
+ segmentsMap1 = ImmutableMap.<String, DataSegment>of(
+ "datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z",
+ segment1
+ );
+ segmentsMap2 = ImmutableMap.<String, DataSegment>of(
+ "datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z",
+ segment2
+ );
+ segmentsMap3 = ImmutableMap.<String, DataSegment>of(
+ "datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z",
+ segment3
+ );
+ segmentsMap4 = ImmutableMap.<String, DataSegment>of(
+ "datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z",
+ segment4
+ );
+ }
+
+ //checks if every segment is selected at least once out of 5000 trials
+ @Test
+ public void getRandomBalancerSegmentHolderTest()
+ {
+ EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce();
+ EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
+ EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
+ EasyMock.expect(druidServer1.getSegments()).andReturn(segmentsMap1).anyTimes();
+ EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+ EasyMock.replay(druidServer1);
+
+ EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce();
+ EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
+ EasyMock.expect(druidServer2.getCurrSize()).andReturn(30L).atLeastOnce();
+ EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
+ EasyMock.expect(druidServer2.getSegments()).andReturn(segmentsMap2).anyTimes();
+ EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+ EasyMock.replay(druidServer2);
+
+ EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce();
+ EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes();
+ EasyMock.expect(druidServer3.getCurrSize()).andReturn(30L).atLeastOnce();
+ EasyMock.expect(druidServer3.getMaxSize()).andReturn(100L).atLeastOnce();
+ EasyMock.expect(druidServer3.getSegments()).andReturn(segmentsMap3).anyTimes();
+ EasyMock.expect(druidServer3.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+ EasyMock.replay(druidServer3);
+
+ EasyMock.expect(druidServer4.getName()).andReturn("4").atLeastOnce();
+ EasyMock.expect(druidServer4.getTier()).andReturn("normal").anyTimes();
+ EasyMock.expect(druidServer4.getCurrSize()).andReturn(30L).atLeastOnce();
+ EasyMock.expect(druidServer4.getMaxSize()).andReturn(100L).atLeastOnce();
+ EasyMock.expect(druidServer4.getSegments()).andReturn(segmentsMap4).anyTimes();
+ EasyMock.expect(druidServer4.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+ EasyMock.replay(druidServer4);
+
+ EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes();
+ EasyMock.replay(holder1);
+ EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes();
+ EasyMock.replay(holder2);
+
+ EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes();
+ EasyMock.replay(holder3);
+
+ EasyMock.expect(holder4.getServer()).andReturn(druidServer4).anyTimes();
+ EasyMock.replay(holder4);
+
+ List<ServerHolder> holderList = Lists.newArrayList();
+ holderList.add(holder1);
+ holderList.add(holder2);
+ holderList.add(holder3);
+ holderList.add(holder4);
+
+ Map<DataSegment, Integer> segmentCountMap = Maps.newHashMap();
+ ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
+ for (int i = 0; i < 5000; i++) {
+ segmentCountMap.put(sampler.getRandomBalancerSegmentHolder(holderList).getSegment(), 1);
+ }
+
+ for (DataSegment segment : segments) {
+ Assert.assertEquals(segmentCountMap.get(segment), new Integer(1));
+ }
+
+
+ }
+}
diff --git a/server/src/test/java/com/metamx/druid/utils/DruidMasterBalancerProfiler.java b/server/src/test/java/com/metamx/druid/utils/DruidMasterBalancerProfiler.java
new file mode 100644
index 0000000..482274c
--- /dev/null
+++ b/server/src/test/java/com/metamx/druid/utils/DruidMasterBalancerProfiler.java
@@ -0,0 +1,249 @@
+/*
+* Druid - a distributed column store.
+* Copyright (C) 2012 Metamarkets Group Inc.
+*
+* This program is free software; you can redistribute it and/or
+* modify it under the terms of the GNU General Public License
+* as published by the Free Software Foundation; either version 2
+* of the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+*/
+
+package com.metamx.druid.utils;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.MinMaxPriorityQueue;
+import com.metamx.druid.client.DataSegment;
+import com.metamx.druid.client.DruidServer;
+import com.metamx.druid.db.DatabaseRuleManager;
+import com.metamx.druid.master.DruidCluster;
+import com.metamx.druid.master.DruidMaster;
+import com.metamx.druid.master.DruidMasterBalancerTester;
+import com.metamx.druid.master.DruidMasterRuleRunner;
+import com.metamx.druid.master.DruidMasterRuntimeParams;
+import com.metamx.druid.master.LoadPeonCallback;
+import com.metamx.druid.master.LoadQueuePeon;
+import com.metamx.druid.master.LoadQueuePeonTester;
+import com.metamx.druid.master.MasterSegmentSettings;
+import com.metamx.druid.master.ReplicationThrottler;
+import com.metamx.druid.master.SegmentReplicantLookup;
+import com.metamx.druid.master.ServerHolder;
+import com.metamx.druid.master.rules.PeriodLoadRule;
+import com.metamx.druid.master.rules.Rule;
+import com.metamx.druid.shard.NoneShardSpec;
+import com.metamx.emitter.EmittingLogger;
+import com.metamx.emitter.service.ServiceEmitter;
+import org.easymock.EasyMock;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.Before;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DruidMasterBalancerProfiler
+{
+ private static final int MAX_SEGMENTS_TO_MOVE = 5;
+ private DruidMaster master;
+ private DruidServer druidServer1;
+ private DruidServer druidServer2;
+ Map<String, DataSegment> segments = Maps.newHashMap();
+ ServiceEmitter emitter;
+ DatabaseRuleManager manager;
+ PeriodLoadRule loadRule = new PeriodLoadRule(new Period("P5000Y"),3,"normal");
+ List<Rule> rules = ImmutableList.<Rule>of(loadRule);
+
+ @Before
+ public void setUp() throws Exception
+ {
+ master = EasyMock.createMock(DruidMaster.class);
+ druidServer1 = EasyMock.createMock(DruidServer.class);
+ druidServer2 = EasyMock.createMock(DruidServer.class);
+ emitter = EasyMock.createMock(ServiceEmitter.class);
+ EmittingLogger.registerEmitter(emitter);
+ manager = EasyMock.createMock(DatabaseRuleManager.class);
+ }
+
+ public void bigProfiler()
+ {
+ Stopwatch watch = new Stopwatch();
+ int numSegments = 55000;
+ int numServers=50;
+ EasyMock.expect(manager.getAllRules()).andReturn(ImmutableMap.<String, List<Rule>>of("test", rules)).anyTimes();
+ EasyMock.expect(manager.getRules(EasyMock.<String>anyObject())).andReturn(rules).anyTimes();
+ EasyMock.expect(manager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(rules).anyTimes();
+ EasyMock.replay(manager);
+
+ master.moveSegment(
+ EasyMock.<String>anyObject(),
+ EasyMock.<String>anyObject(),
+ EasyMock.<String>anyObject(),
+ EasyMock.<LoadPeonCallback>anyObject()
+ );
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.replay(master);
+
+ List<DruidServer> serverList = Lists.newArrayList();
+ Map<String, LoadQueuePeon> peonMap = Maps.newHashMap();
+ List<ServerHolder> serverHolderList = Lists.newArrayList();
+ Map<String,DataSegment> segmentMap = Maps.newHashMap();
+ for (int i=0;i<numSegments;i++)
+ {
+ segmentMap.put(
+ "segment" + i,
+ new DataSegment(
+ "datasource" + i,
+ new Interval(new DateTime("2012-01-01"), (new DateTime("2012-01-01")).plusHours(1)),
+ (new DateTime("2012-03-01")).toString(),
+ Maps.<String, Object>newHashMap(),
+ Lists.<String>newArrayList(),
+ Lists.<String>newArrayList(),
+ new NoneShardSpec(),
+ 0,
+ 4L
+ )
+ );
+ }
+
+ for (int i=0;i<numServers;i++)
+ {
+ DruidServer server =EasyMock.createMock(DruidServer.class);
+ EasyMock.expect(server.getMetadata()).andReturn(null).anyTimes();
+ EasyMock.expect(server.getCurrSize()).andReturn(30L).atLeastOnce();
+ EasyMock.expect(server.getMaxSize()).andReturn(100L).atLeastOnce();
+ EasyMock.expect(server.getTier()).andReturn("normal").anyTimes();
+ EasyMock.expect(server.getName()).andReturn(Integer.toString(i)).atLeastOnce();
+ EasyMock.expect(server.getHost()).andReturn(Integer.toString(i)).anyTimes();
+ if (i==0)
+ {
+ EasyMock.expect(server.getSegments()).andReturn(segmentMap).anyTimes();
+ }
+ else
+ {
+ EasyMock.expect(server.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
+ }
+ EasyMock.expect(server.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+ EasyMock.replay(server);
+
+ LoadQueuePeon peon = new LoadQueuePeonTester();
+ peonMap.put(Integer.toString(i),peon);
+ serverHolderList.add(new ServerHolder(server, peon));
+ }
+
+ DruidMasterRuntimeParams params =
+ DruidMasterRuntimeParams.newBuilder()
+ .withDruidCluster(
+ new DruidCluster(
+ ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
+ "normal",
+ MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
+ .create(
+ serverHolderList
+ )
+ )
+ )
+ )
+ .withLoadManagementPeons(
+ peonMap
+ )
+ .withAvailableSegments(segmentMap.values())
+ .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
+ .withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
+ .withEmitter(emitter)
+ .withDatabaseRuleManager(manager)
+ .withReplicationManager(new ReplicationThrottler(2, 500))
+ .withSegmentReplicantLookup(
+ SegmentReplicantLookup.make(new DruidCluster(
+ ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
+ "normal",
+ MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
+ .create(
+ serverHolderList
+ )
+ )
+ )
+ )
+ )
+ .build();
+
+ DruidMasterBalancerTester tester = new DruidMasterBalancerTester(master);
+ DruidMasterRuleRunner runner = new DruidMasterRuleRunner(master,500,5);
+ watch.start();
+ DruidMasterRuntimeParams balanceParams = tester.run(params);
+ DruidMasterRuntimeParams assignParams = runner.run(params);
+ System.out.println(watch.stop());
+ }
+
+
+ public void profileRun(){
+ Stopwatch watch = new Stopwatch();
+ LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
+ LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
+
+ EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
+ EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
+ EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
+ EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
+ EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+ EasyMock.replay(druidServer1);
+
+ EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce();
+ EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
+ EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
+ EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
+ EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
+ EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
+ EasyMock.replay(druidServer2);
+
+ master.moveSegment(
+ EasyMock.<String>anyObject(),
+ EasyMock.<String>anyObject(),
+ EasyMock.<String>anyObject(),
+ EasyMock.<LoadPeonCallback>anyObject()
+ );
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.replay(master);
+
+ DruidMasterRuntimeParams params =
+ DruidMasterRuntimeParams.newBuilder()
+ .withDruidCluster(
+ new DruidCluster(
+ ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
+ "normal",
+ MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
+ .create(
+ Arrays.asList(
+ new ServerHolder(druidServer1, fromPeon),
+ new ServerHolder(druidServer2, toPeon)
+ )
+ )
+ )
+ )
+ )
+ .withLoadManagementPeons(ImmutableMap.<String, LoadQueuePeon>of("from", fromPeon, "to", toPeon))
+ .withAvailableSegments(segments.values())
+ .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
+ .withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
+ .build();
+ DruidMasterBalancerTester tester = new DruidMasterBalancerTester(master);
+ watch.start();
+ DruidMasterRuntimeParams balanceParams = tester.run(params);
+ System.out.println(watch.stop());
+ }
+
+}