blob: e34cd98772a212ccf23302a42a0c7055accbfb34 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.rules;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.BalancerStrategy;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ReplicationThrottler;
import org.apache.druid.server.coordinator.SegmentReplicantLookup;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* LoadRules indicate the number of replicants a segment should have in a given tier.
*/
public abstract class LoadRule implements Rule
{
private static final EmittingLogger log = new EmittingLogger(LoadRule.class);
static final String ASSIGNED_COUNT = "assignedCount";
static final String DROPPED_COUNT = "droppedCount";
public final String NON_PRIMARY_ASSIGNED_COUNT = "totalNonPrimaryReplicantsLoaded";
public static final String REQUIRED_CAPACITY = "requiredCapacity";
private final Object2IntMap<String> targetReplicants = new Object2IntOpenHashMap<>();
private final Object2IntMap<String> currentReplicants = new Object2IntOpenHashMap<>();
// Cache to hold unused results from strategy call in assignPrimary
private final Map<String, ServerHolder> strategyCache = new HashMap<>();
@Override
public CoordinatorStats run(
final DruidCoordinator coordinator,
final DruidCoordinatorRuntimeParams params,
final DataSegment segment
)
{
try {
// get the "snapshots" of targetReplicants and currentReplicants for assignments.
targetReplicants.putAll(getTieredReplicants());
currentReplicants.putAll(params.getSegmentReplicantLookup().getClusterTiers(segment.getId()));
final CoordinatorStats stats = new CoordinatorStats();
assign(params, segment, stats);
drop(params, segment, stats);
for (String tier : targetReplicants.keySet()) {
stats.addToTieredStat(REQUIRED_CAPACITY, tier, segment.getSize() * targetReplicants.getInt(tier));
}
return stats;
}
finally {
targetReplicants.clear();
currentReplicants.clear();
strategyCache.clear();
}
}
@Override
public boolean canLoadSegments()
{
return true;
}
@Override
public void updateUnderReplicated(
Map<String, Object2LongMap<String>> underReplicatedPerTier,
SegmentReplicantLookup segmentReplicantLookup,
DataSegment segment
)
{
getTieredReplicants().forEach((final String tier, final Integer ruleReplicants) -> {
int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
Object2LongMap<String> underReplicationPerDataSource = underReplicatedPerTier.computeIfAbsent(
tier,
ignored -> new Object2LongOpenHashMap<>()
);
((Object2LongOpenHashMap<String>) underReplicationPerDataSource).addTo(
segment.getDataSource(),
Math.max(ruleReplicants - currentReplicants, 0)
);
});
}
@Override
public void updateUnderReplicatedWithClusterView(
Map<String, Object2LongMap<String>> underReplicatedPerTier,
SegmentReplicantLookup segmentReplicantLookup,
DruidCluster cluster,
DataSegment segment
)
{
getTieredReplicants().forEach((final String tier, final Integer ruleReplicants) -> {
int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
Object2LongMap<String> underReplicationPerDataSource = underReplicatedPerTier.computeIfAbsent(
tier,
ignored -> new Object2LongOpenHashMap<>()
);
int possibleReplicants = Math.min(ruleReplicants, cluster.getHistoricals().get(tier).size());
log.debug(
"ruleReplicants: [%d], possibleReplicants: [%d], currentReplicants: [%d]",
ruleReplicants,
possibleReplicants,
currentReplicants
);
((Object2LongOpenHashMap<String>) underReplicationPerDataSource).addTo(
segment.getDataSource(),
Math.max(possibleReplicants - currentReplicants, 0)
);
});
}
/**
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
*/
private void assign(
final DruidCoordinatorRuntimeParams params,
final DataSegment segment,
final CoordinatorStats stats
)
{
// if primary replica already exists or is loading
final int loading = params.getSegmentReplicantLookup().getTotalReplicants(segment.getId());
if (!currentReplicants.isEmpty() || loading > 0) {
assignReplicas(params, segment, stats, null);
} else {
final ServerHolder primaryHolderToLoad = assignPrimary(params, segment);
if (primaryHolderToLoad == null) {
// cluster does not have any replicants and cannot identify primary holder
// this implies that no assignment could be done
return;
}
int numAssigned = 1; // 1 replica (i.e., primary replica) already assigned
final String tier = primaryHolderToLoad.getServer().getTier();
// assign replicas for the rest of the tier
numAssigned += assignReplicasForTier(
tier,
targetReplicants.getOrDefault(tier, 0),
numAssigned, // note that the currentReplicantsInTier is the just-assigned primary replica.
params,
createLoadQueueSizeLimitingPredicate(params).and(holder -> !holder.equals(primaryHolderToLoad)),
segment
);
// numAssigned - 1 because we don't want to count the primary assignment
stats.addToGlobalStat(NON_PRIMARY_ASSIGNED_COUNT, numAssigned - 1);
stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned);
// do assign replicas for the other tiers.
assignReplicas(params, segment, stats, tier /* to skip */);
}
}
private static Predicate<ServerHolder> createLoadQueueSizeLimitingPredicate(
final DruidCoordinatorRuntimeParams params
)
{
final int maxSegmentsInNodeLoadingQueue = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
if (maxSegmentsInNodeLoadingQueue <= 0) {
return Objects::nonNull;
} else {
return s -> (s != null && s.getNumberOfSegmentsInQueue() < maxSegmentsInNodeLoadingQueue);
}
}
private static List<ServerHolder> getFilteredHolders(
final String tier,
final DruidCluster druidCluster,
final Predicate<ServerHolder> predicate
)
{
final NavigableSet<ServerHolder> queue = druidCluster.getHistoricalsByTier(tier);
if (queue == null) {
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit();
return Collections.emptyList();
}
Predicate<ServerHolder> isActive = s -> !s.isDecommissioning();
return queue.stream().filter(isActive.and(predicate)).collect(Collectors.toList());
}
/**
* Iterates through each tier and find the respective segment homes; with the found segment homes, selects the one
* with the highest priority to be the holder for the primary replica.
*/
@Nullable
private ServerHolder assignPrimary(
final DruidCoordinatorRuntimeParams params,
final DataSegment segment
)
{
ServerHolder topCandidate = null;
for (final Object2IntMap.Entry<String> entry : targetReplicants.object2IntEntrySet()) {
final int targetReplicantsInTier = entry.getIntValue();
// sanity check: target number of replicants should be more than zero.
if (targetReplicantsInTier <= 0) {
continue;
}
final String tier = entry.getKey();
String noAvailability = StringUtils.format(
"No available [%s] servers or node capacity to assign primary segment[%s]! Expected Replicants[%d]",
tier,
segment.getId(),
targetReplicantsInTier
);
final List<ServerHolder> holders = getFilteredHolders(
tier,
params.getDruidCluster(),
createLoadQueueSizeLimitingPredicate(params)
);
// no holders satisfy the predicate
if (holders.isEmpty()) {
log.warn(noAvailability);
continue;
}
final ServerHolder candidate = params.getBalancerStrategy().findNewSegmentHomeReplicator(segment, holders);
if (candidate == null) {
log.warn(noAvailability);
} else {
// cache the result for later use.
strategyCache.put(tier, candidate);
if (topCandidate == null ||
candidate.getServer().getPriority() > topCandidate.getServer().getPriority()) {
topCandidate = candidate;
}
}
}
if (topCandidate != null) {
// remove tier for primary replica
strategyCache.remove(topCandidate.getServer().getTier());
log.info(
"Assigning 'primary' for segment [%s] to server [%s] in tier [%s]",
segment.getId(),
topCandidate.getServer().getName(),
topCandidate.getServer().getTier()
);
topCandidate.getPeon().loadSegment(segment, null);
}
return topCandidate;
}
/**
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
* @param tierToSkip if not null, this tier will be skipped from doing assignment, use when primary replica was
* assigned.
*/
private void assignReplicas(
final DruidCoordinatorRuntimeParams params,
final DataSegment segment,
final CoordinatorStats stats,
@Nullable final String tierToSkip
)
{
for (final Object2IntMap.Entry<String> entry : targetReplicants.object2IntEntrySet()) {
final String tier = entry.getKey();
if (tier.equals(tierToSkip)) {
log.info("Skipping replica assignment for tier [%s]", tier);
continue;
}
final int numAssigned = assignReplicasForTier(
tier,
entry.getIntValue(),
params.getSegmentReplicantLookup().getTotalReplicants(segment.getId(), tier),
params,
createLoadQueueSizeLimitingPredicate(params),
segment
);
stats.addToGlobalStat(NON_PRIMARY_ASSIGNED_COUNT, numAssigned);
stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned);
}
}
/**
* @param predicate {@link Predicate} used to pre-filter {@link ServerHolder}s retrieved from {@link DruidCluster}.
*/
private int assignReplicasForTier(
final String tier,
final int targetReplicantsInTier,
final int currentReplicantsInTier,
final DruidCoordinatorRuntimeParams params,
final Predicate<ServerHolder> predicate,
final DataSegment segment
)
{
final int numToAssign = targetReplicantsInTier - currentReplicantsInTier;
// if nothing to assign
if (numToAssign <= 0) {
return 0;
}
String noAvailability = StringUtils.format(
"No available [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]",
tier,
segment.getId(),
targetReplicantsInTier
);
final List<ServerHolder> holders = getFilteredHolders(tier, params.getDruidCluster(), predicate);
// if no holders available for assignment
if (holders.isEmpty()) {
log.warn(noAvailability);
return 0;
}
final ReplicationThrottler throttler = params.getReplicationManager();
for (int numAssigned = 0; numAssigned < numToAssign; numAssigned++) {
if (!throttler.canCreateReplicant(tier)) {
log.info("Throttling replication for segment [%s] in tier [%s]", segment.getId(), tier);
return numAssigned;
}
// Retrieves from cache if available
ServerHolder holder = strategyCache.remove(tier);
// Does strategy call if not in cache
if (holder == null) {
holder = params.getBalancerStrategy().findNewSegmentHomeReplicator(segment, holders);
}
if (holder == null) {
log.warn(noAvailability);
return numAssigned;
}
holders.remove(holder);
final SegmentId segmentId = segment.getId();
final String holderHost = holder.getServer().getHost();
throttler.registerReplicantCreation(tier, segmentId, holderHost);
log.info(
"Assigning 'replica' for segment [%s] to server [%s] in tier [%s]",
segment.getId(),
holder.getServer().getName(),
holder.getServer().getTier()
);
holder.getPeon().loadSegment(segment, () -> throttler.unregisterReplicantCreation(tier, segmentId));
}
return numToAssign;
}
/**
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
*/
private void drop(
final DruidCoordinatorRuntimeParams params,
final DataSegment segment,
final CoordinatorStats stats
)
{
final DruidCluster druidCluster = params.getDruidCluster();
// This enforces that loading is completed before we attempt to drop stuffs as a safety measure.
if (loadingInProgress(druidCluster)) {
log.info("Loading in progress, skipping drop until loading is complete");
return;
}
for (final Object2IntMap.Entry<String> entry : currentReplicants.object2IntEntrySet()) {
final String tier = entry.getKey();
final NavigableSet<ServerHolder> holders = druidCluster.getHistoricalsByTier(tier);
final int numDropped;
if (holders == null) {
log.makeAlert("No holders found for tier[%s]", tier).emit();
numDropped = 0;
} else {
final int currentReplicantsInTier = entry.getIntValue();
final int numToDrop = currentReplicantsInTier - targetReplicants.getOrDefault(tier, 0);
if (numToDrop > 0) {
numDropped = dropForTier(numToDrop, holders, segment, params.getBalancerStrategy());
} else {
numDropped = 0;
}
}
stats.addToTieredStat(DROPPED_COUNT, tier, numDropped);
}
}
/**
* Returns true if at least one tier in target replica assignment exists in cluster but does not have enough replicas.
*/
private boolean loadingInProgress(final DruidCluster druidCluster)
{
for (final Object2IntMap.Entry<String> entry : targetReplicants.object2IntEntrySet()) {
final String tier = entry.getKey();
// if there are replicants loading in cluster
if (druidCluster.hasTier(tier) && entry.getIntValue() > currentReplicants.getOrDefault(tier, 0)) {
return true;
}
}
return false;
}
private static int dropForTier(
final int numToDrop,
final NavigableSet<ServerHolder> holdersInTier,
final DataSegment segment,
final BalancerStrategy balancerStrategy
)
{
Map<Boolean, TreeSet<ServerHolder>> holders = holdersInTier.stream()
.filter(s -> s.isServingSegment(segment))
.collect(Collectors.partitioningBy(
ServerHolder::isDecommissioning,
Collectors.toCollection(TreeSet::new)
));
TreeSet<ServerHolder> decommissioningServers = holders.get(true);
TreeSet<ServerHolder> activeServers = holders.get(false);
int left = dropSegmentFromServers(balancerStrategy, segment, decommissioningServers, numToDrop);
if (left > 0) {
left = dropSegmentFromServers(balancerStrategy, segment, activeServers, left);
}
if (left != 0) {
log.warn("I have no servers serving [%s]?", segment.getId());
}
return numToDrop - left;
}
private static int dropSegmentFromServers(
BalancerStrategy balancerStrategy,
DataSegment segment,
NavigableSet<ServerHolder> holders, int numToDrop
)
{
final Iterator<ServerHolder> iterator = balancerStrategy.pickServersToDrop(segment, holders);
while (numToDrop > 0) {
if (!iterator.hasNext()) {
break;
}
final ServerHolder holder = iterator.next();
if (holder.isServingSegment(segment)) {
log.info(
"Dropping segment [%s] on server [%s] in tier [%s]",
segment.getId(),
holder.getServer().getName(),
holder.getServer().getTier()
);
holder.getPeon().dropSegment(segment, null);
numToDrop--;
} else {
log.warn(
"Server [%s] is no longer serving segment [%s], skipping drop.",
holder.getServer().getName(),
segment.getId()
);
}
}
return numToDrop;
}
protected static void validateTieredReplicants(final Map<String, Integer> tieredReplicants)
{
if (tieredReplicants.size() == 0) {
throw new IAE("A rule with empty tiered replicants is invalid");
}
for (Map.Entry<String, Integer> entry : tieredReplicants.entrySet()) {
if (entry.getValue() == null) {
throw new IAE("Replicant value cannot be empty");
}
if (entry.getValue() < 0) {
throw new IAE("Replicant value [%d] is less than 0, which is not allowed", entry.getValue());
}
}
}
public abstract Map<String, Integer> getTieredReplicants();
public abstract int getNumReplicants(String tier);
}