blob: 218cf696b7ad0d37e90aed52dd9a70889ae2974b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import com.google.common.collect.Lists;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.BalancerSegmentHolder;
import org.apache.druid.server.coordinator.BalancerStrategy;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.LoadPeonCallback;
import org.apache.druid.server.coordinator.LoadQueuePeon;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/**
*/
public class BalanceSegments implements CoordinatorDuty
{
protected static final EmittingLogger log = new EmittingLogger(BalanceSegments.class);
protected final DruidCoordinator coordinator;
protected final Map<String, ConcurrentHashMap<SegmentId, BalancerSegmentHolder>> currentlyMovingSegments =
new HashMap<>();
private static final int DEFAULT_RESERVOIR_SIZE = 1;
public BalanceSegments(DruidCoordinator coordinator)
{
this.coordinator = coordinator;
}
protected void reduceLifetimes(String tier)
{
for (BalancerSegmentHolder holder : currentlyMovingSegments.get(tier).values()) {
holder.reduceLifetime();
if (holder.getLifetime() <= 0) {
log.makeAlert("[%s]: Balancer move segments queue has a segment stuck", tier)
.addData("segment", holder.getSegment().getId())
.addData("server", holder.getFromServer().getMetadata())
.emit();
}
}
}
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
final CoordinatorStats stats = new CoordinatorStats();
params.getDruidCluster().getHistoricals().forEach((String tier, NavigableSet<ServerHolder> servers) -> {
balanceTier(params, tier, servers, stats);
});
return params.buildFromExisting().withCoordinatorStats(stats).build();
}
private void balanceTier(
DruidCoordinatorRuntimeParams params,
String tier,
SortedSet<ServerHolder> servers,
CoordinatorStats stats
)
{
if (params.getUsedSegments().size() == 0) {
log.info("Metadata segments are not available. Cannot balance.");
// suppress emit zero stats
return;
}
currentlyMovingSegments.computeIfAbsent(tier, t -> new ConcurrentHashMap<>());
if (!currentlyMovingSegments.get(tier).isEmpty()) {
reduceLifetimes(tier);
log.info(
"[%s]: Still waiting on %,d segments to be moved. Skipping balance.",
tier,
currentlyMovingSegments.get(tier).size()
);
// suppress emit zero stats
return;
}
/*
Take as many segments from decommissioning servers as decommissioningMaxPercentOfMaxSegmentsToMove allows and find
the best location for them on active servers. After that, balance segments within active servers pool.
*/
Map<Boolean, List<ServerHolder>> partitions =
servers.stream().collect(Collectors.partitioningBy(ServerHolder::isDecommissioning));
final List<ServerHolder> decommissioningServers = partitions.get(true);
final List<ServerHolder> activeServers = partitions.get(false);
log.info(
"Found %d active servers, %d decommissioning servers",
activeServers.size(),
decommissioningServers.size()
);
if ((decommissioningServers.isEmpty() && activeServers.size() <= 1) || activeServers.isEmpty()) {
log.warn("[%s]: insufficient active servers. Cannot balance.", tier);
// suppress emit zero stats
return;
}
int numSegments = 0;
for (ServerHolder sourceHolder : servers) {
numSegments += sourceHolder.getServer().getNumSegments();
}
if (numSegments == 0) {
log.info("No segments found. Cannot balance.");
// suppress emit zero stats
return;
}
final int maxSegmentsToMove = Math.min(params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(), numSegments);
int decommissioningMaxPercentOfMaxSegmentsToMove =
params.getCoordinatorDynamicConfig().getDecommissioningMaxPercentOfMaxSegmentsToMove();
int maxSegmentsToMoveFromDecommissioningNodes =
(int) Math.ceil(maxSegmentsToMove * (decommissioningMaxPercentOfMaxSegmentsToMove / 100.0));
log.info(
"Processing %d segments for moving from decommissioning servers",
maxSegmentsToMoveFromDecommissioningNodes
);
Pair<Integer, Integer> decommissioningResult =
balanceServers(params, decommissioningServers, activeServers, maxSegmentsToMoveFromDecommissioningNodes);
int maxGeneralSegmentsToMove = maxSegmentsToMove - decommissioningResult.lhs;
log.info("Processing %d segments for balancing between active servers", maxGeneralSegmentsToMove);
Pair<Integer, Integer> generalResult =
balanceServers(params, activeServers, activeServers, maxGeneralSegmentsToMove);
int moved = generalResult.lhs + decommissioningResult.lhs;
int unmoved = generalResult.rhs + decommissioningResult.rhs;
if (unmoved == maxSegmentsToMove) {
// Cluster should be alive and constantly adjusting
log.info("No good moves found in tier [%s]", tier);
}
stats.addToTieredStat("unmovedCount", tier, unmoved);
stats.addToTieredStat("movedCount", tier, moved);
if (params.getCoordinatorDynamicConfig().emitBalancingStats()) {
final BalancerStrategy strategy = params.getBalancerStrategy();
strategy.emitStats(tier, stats, Lists.newArrayList(servers));
}
log.info("[%s]: Segments Moved: [%d] Segments Let Alone: [%d]", tier, moved, unmoved);
}
private Pair<Integer, Integer> balanceServers(
DruidCoordinatorRuntimeParams params,
List<ServerHolder> toMoveFrom,
List<ServerHolder> toMoveTo,
int maxSegmentsToMove
)
{
if (maxSegmentsToMove <= 0) {
return new Pair<>(0, 0);
}
final BalancerStrategy strategy = params.getBalancerStrategy();
final int maxIterations = 2 * maxSegmentsToMove;
final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
int moved = 0, unmoved = 0;
Iterator<BalancerSegmentHolder> segmentsToMove = strategy.pickSegmentsToMove(
toMoveFrom,
params.getBroadcastDatasources(),
params.getCoordinatorDynamicConfig().useBatchedSegmentSampler() ? maxSegmentsToMove : DEFAULT_RESERVOIR_SIZE,
params.getCoordinatorDynamicConfig().getPercentOfSegmentsToConsiderPerMove()
);
//noinspection ForLoopThatDoesntUseLoopVariable
for (int iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) {
if (!segmentsToMove.hasNext()) {
log.info("All servers to move segments from are empty, ending run.");
break;
}
final BalancerSegmentHolder segmentToMoveHolder = segmentsToMove.next();
// DruidCoordinatorRuntimeParams.getUsedSegments originate from SegmentsMetadataManager, i. e. that's a set of segments
// that *should* be loaded. segmentToMoveHolder.getSegment originates from ServerInventoryView, i. e. that may be
// any segment that happens to be loaded on some server, even if it is not used. (Coordinator closes such
// discrepancies eventually via UnloadUnusedSegments). Therefore the picked segmentToMoveHolder's segment may not
// need to be balanced.
boolean needToBalancePickedSegment = params.getUsedSegments().contains(segmentToMoveHolder.getSegment());
if (needToBalancePickedSegment) {
final DataSegment segmentToMove = segmentToMoveHolder.getSegment();
final ImmutableDruidServer fromServer = segmentToMoveHolder.getFromServer();
// we want to leave the server the segment is currently on in the list...
// but filter out replicas that are already serving the segment, and servers with a full load queue
final List<ServerHolder> toMoveToWithLoadQueueCapacityAndNotServingSegment =
toMoveTo.stream()
.filter(s -> s.getServer().equals(fromServer) ||
(!s.isServingSegment(segmentToMove) &&
(maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad)))
.collect(Collectors.toList());
if (toMoveToWithLoadQueueCapacityAndNotServingSegment.size() > 0) {
final ServerHolder destinationHolder =
strategy.findNewSegmentHomeBalancer(segmentToMove, toMoveToWithLoadQueueCapacityAndNotServingSegment);
if (destinationHolder != null && !destinationHolder.getServer().equals(fromServer)) {
if (moveSegment(segmentToMoveHolder, destinationHolder.getServer(), params)) {
moved++;
} else {
unmoved++;
}
} else {
log.debug("Segment [%s] is 'optimally' placed.", segmentToMove.getId());
unmoved++;
}
} else {
log.debug("No valid movement destinations for segment [%s].", segmentToMove.getId());
unmoved++;
}
}
if (iter >= maxIterations) {
log.info(
"Unable to select %d remaining candidate segments out of %d total to balance "
+ "after %d iterations, ending run.",
(maxSegmentsToMove - moved - unmoved),
maxSegmentsToMove,
iter
);
break;
}
}
return new Pair<>(moved, unmoved);
}
protected boolean moveSegment(
final BalancerSegmentHolder segment,
final ImmutableDruidServer toServer,
final DruidCoordinatorRuntimeParams params
)
{
final LoadQueuePeon toPeon = params.getLoadManagementPeons().get(toServer.getName());
final ImmutableDruidServer fromServer = segment.getFromServer();
final DataSegment segmentToMove = segment.getSegment();
final SegmentId segmentId = segmentToMove.getId();
if (!toPeon.getSegmentsToLoad().contains(segmentToMove) &&
(toServer.getSegment(segmentId) == null) &&
new ServerHolder(toServer, toPeon).getAvailableSize() > segmentToMove.getSize()) {
log.debug("Moving [%s] from [%s] to [%s]", segmentId, fromServer.getName(), toServer.getName());
LoadPeonCallback callback = null;
try {
ConcurrentMap<SegmentId, BalancerSegmentHolder> movingSegments =
currentlyMovingSegments.get(toServer.getTier());
movingSegments.put(segmentId, segment);
callback = () -> movingSegments.remove(segmentId);
coordinator.moveSegment(
params,
fromServer,
toServer,
segmentToMove,
callback
);
return true;
}
catch (Exception e) {
log.makeAlert(e, StringUtils.format("[%s] : Moving exception", segmentId)).emit();
if (callback != null) {
callback.execute();
}
}
}
return false;
}
}