blob: f3b5de84bf91ae7a76639333197de91ccfc4826d [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master.rules;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.ISE;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterRuntimeParams;
import com.metamx.druid.master.LoadPeonCallback;
import com.metamx.druid.master.MasterStats;
import com.metamx.druid.master.ServerHolder;
import com.metamx.emitter.EmittingLogger;
import java.util.List;
import java.util.Map;
/**
* LoadRules indicate the number of replicants a segment should have in a given tier.
*/
public abstract class LoadRule implements Rule
{
private static final EmittingLogger log = new EmittingLogger(LoadRule.class);
@Override
public MasterStats run(DruidMaster master, DruidMasterRuntimeParams params, DataSegment segment)
{
MasterStats stats = new MasterStats();
int expectedReplicants = getReplicants();
int actualReplicants = params.getSegmentReplicantLookup().lookup(segment.getIdentifier(), getTier());
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(getTier());
if (serverQueue == null) {
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", getTier()).emit();
return stats;
}
stats.accumulate(assign(expectedReplicants, actualReplicants, serverQueue, segment));
stats.accumulate(drop(expectedReplicants, actualReplicants, segment, params));
return stats;
}
private MasterStats assign(
int expectedReplicants,
int actualReplicants,
MinMaxPriorityQueue<ServerHolder> serverQueue,
DataSegment segment
)
{
MasterStats stats = new MasterStats();
List<ServerHolder> assignedServers = Lists.newArrayList();
while (actualReplicants < expectedReplicants) {
ServerHolder holder = serverQueue.pollFirst();
if (holder == null) {
log.warn(
"Not enough %s servers[%d] to assign segment[%s]! Expected Replicants[%d]",
getTier(),
assignedServers.size() + serverQueue.size() + 1,
segment.getIdentifier(),
expectedReplicants
);
break;
}
if (holder.containsSegment(segment)) {
continue;
}
if (holder.getAvailableSize() < segment.getSize()) {
log.warn(
"Not enough node capacity, closest is [%s] with %,d available, skipping segment[%s].",
holder.getServer(),
holder.getAvailableSize(),
segment
);
log.makeAlert(
"Not enough node capacity",
ImmutableMap.<String, Object>builder()
.put("segmentSkipped", segment.toString())
.put("closestNode", holder.getServer().toString())
.put("availableSize", holder.getAvailableSize())
.build()
).emit();
serverQueue.add(holder);
stats.addToTieredStat("unassignedCount", getTier(), 1);
stats.addToTieredStat("unassignedSize", getTier(), segment.getSize());
break;
}
holder.getPeon().loadSegment(
segment,
new LoadPeonCallback()
{
@Override
protected void execute()
{
}
}
);
assignedServers.add(holder);
stats.addToTieredStat("assignedCount", getTier(), 1);
++actualReplicants;
}
serverQueue.addAll(assignedServers);
return stats;
}
private MasterStats drop(
int expectedReplicants,
int actualReplicants,
DataSegment segment,
DruidMasterRuntimeParams params
)
{
MasterStats stats = new MasterStats();
if (!params.hasDeletionWaitTimeElapsed()) {
return stats;
}
// Make sure we have enough actual replicants in the cluster before doing anything
if (actualReplicants < expectedReplicants) {
return stats;
}
Map<String, Integer> replicantsByType = params.getSegmentReplicantLookup().getTiers(segment.getIdentifier());
for (Map.Entry<String, Integer> entry : replicantsByType.entrySet()) {
String tier = entry.getKey();
int actualNumReplicantsForType = entry.getValue();
int expectedNumReplicantsForType = getReplicants(tier);
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().get(tier);
if (serverQueue == null) {
log.makeAlert("No holders found for tier[%s]", entry.getKey()).emit();
return stats;
}
List<ServerHolder> droppedServers = Lists.newArrayList();
while (actualNumReplicantsForType > expectedNumReplicantsForType) {
ServerHolder holder = serverQueue.pollLast();
if (holder == null) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
break;
}
if (holder.isServingSegment(segment)) {
holder.getPeon().dropSegment(
segment,
new LoadPeonCallback()
{
@Override
protected void execute()
{
}
}
);
--actualNumReplicantsForType;
stats.addToTieredStat("droppedCount", tier, 1);
}
droppedServers.add(holder);
}
serverQueue.addAll(droppedServers);
}
return stats;
}
public abstract int getReplicants();
public abstract int getReplicants(String tier);
public abstract String getTier();
}