blob: bdf475b8d1b83a8e7742f7cc686378a64e83c2a6 [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.client.DruidServer;
import java.util.Collections;
import java.util.List;
import java.util.Set;
/**
* The BalancerAnalyzer keeps the state of the highest and lowest percent used servers. It will update
* these states and perform lookaheads to make sure the updated states result in a more balanced cluster.
*/
public class BalancerAnalyzer
{
private static final Logger log = new Logger(BalancerAnalyzer.class);
private static final int PERCENT_THRESHOLD = 3;
private static final int MAX_SEGMENTS_TO_MOVE = 5;
private volatile Long highestSizeUsed;
private volatile double highestPercentUsed;
private volatile Long highestPercentUsedServerMaxSize;
private volatile Long lowestSizeUsed;
private volatile double lowestPercentUsed;
private volatile Long lowestPercentUsedServerMaxSize;
public BalancerAnalyzer()
{
this.highestSizeUsed = 0L;
this.highestPercentUsed = 0;
this.highestPercentUsedServerMaxSize = 0L;
this.lowestSizeUsed = 0L;
this.lowestPercentUsed = 0;
this.lowestPercentUsedServerMaxSize = 0L;
}
public void init(ServerHolder highestPercentUsedServer, ServerHolder lowestPercentUsedServer)
{
highestSizeUsed = highestPercentUsedServer.getSizeUsed();
highestPercentUsed = highestPercentUsedServer.getPercentUsed();
highestPercentUsedServerMaxSize = highestPercentUsedServer.getMaxSize();
lowestSizeUsed = lowestPercentUsedServer.getSizeUsed();
lowestPercentUsed = lowestPercentUsedServer.getPercentUsed();
lowestPercentUsedServerMaxSize = lowestPercentUsedServer.getMaxSize();
}
public void update(long newHighestSizeUsed, long newLowestSizedUsed)
{
highestSizeUsed = newHighestSizeUsed;
highestPercentUsed = highestSizeUsed.doubleValue() / highestPercentUsedServerMaxSize;
lowestSizeUsed = newLowestSizedUsed;
lowestPercentUsed = lowestSizeUsed.doubleValue() / lowestPercentUsedServerMaxSize;
}
public double getPercentDiff()
{
return Math.abs(
100 * ((highestPercentUsed - lowestPercentUsed)
/ ((highestPercentUsed + lowestPercentUsed) / 2))
);
}
public double getLookaheadPercentDiff(Long newHighestSizeUsed, Long newLowestSizedUsed)
{
double newHighestPercentUsed = 100 * (newHighestSizeUsed.doubleValue() / highestPercentUsedServerMaxSize);
double newLowestPercentUsed = 100 * (newLowestSizedUsed.doubleValue() / lowestPercentUsedServerMaxSize);
return Math.abs(
100 * ((newHighestPercentUsed - newLowestPercentUsed)
/ ((newHighestPercentUsed + newLowestPercentUsed) / 2))
);
}
public Set<BalancerSegmentHolder> findSegmentsToMove(DruidServer server)
{
Set<BalancerSegmentHolder> segmentsToMove = Sets.newHashSet();
double currPercentDiff = getPercentDiff();
if (currPercentDiff < PERCENT_THRESHOLD) {
log.info("Cluster usage is balanced.");
return segmentsToMove;
}
List<DruidDataSource> dataSources = Lists.newArrayList(server.getDataSources());
Collections.shuffle(dataSources);
for (DruidDataSource dataSource : dataSources) {
List<DataSegment> segments = Lists.newArrayList(dataSource.getSegments());
Collections.shuffle(segments);
for (DataSegment segment : segments) {
if (segmentsToMove.size() >= MAX_SEGMENTS_TO_MOVE) {
return segmentsToMove;
}
if (getLookaheadPercentDiff(highestSizeUsed - segment.getSize(), lowestSizeUsed + segment.getSize())
< currPercentDiff) {
segmentsToMove.add(new BalancerSegmentHolder(server, segment));
update(highestSizeUsed - segment.getSize(), lowestSizeUsed + segment.getSize());
}
}
}
return segmentsToMove;
}
}