blob: a4eb5077224ce9239bd59971a8a8480301fa3a62 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.impl;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Map;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Load shedding strategy which will attempt to shed exactly one bundle on brokers which are overloaded, that is, whose
* maximum system resource usage exceeds loadBalancerBrokerOverloadedThresholdPercentage. To see which resources are
* considered when determining the maximum system resource, see {@link LocalBrokerData#getMaxResourceUsage()}. A bundle
* is recommended for unloading off that broker if and only if the following conditions hold: The broker has at
* least two bundles assigned and the broker has at least one bundle that has not been unloaded recently according to
* LoadBalancerSheddingGracePeriodMinutes. The unloaded bundle will be the most expensive bundle in terms of message
* rate that has not been recently unloaded. Note that this strategy does not take into account "underloaded" brokers
* when determining which bundles to unload. If you are looking for a strategy that spreads load evenly across
* all brokers, see {@link ThresholdShedder}.
*/
public class OverloadShedder implements LoadSheddingStrategy {
private static final Logger log = LoggerFactory.getLogger(OverloadShedder.class);
private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
private static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;
/**
* Attempt to shed some bundles off every broker which is overloaded.
*
* @param loadData
* The load data to used to make the unloading decision.
* @param conf
* The service configuration.
* @return A map from bundles to unload to the brokers on which they are loaded.
*/
@Override
public Multimap<String, String> findBundlesForUnloading(final LoadData loadData, final ServiceConfiguration conf) {
selectedBundlesCache.clear();
final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
// Check every broker and select
loadData.getBrokerData().forEach((broker, brokerData) -> {
final LocalBrokerData localData = brokerData.getLocalData();
final double currentUsage = localData.getMaxResourceUsage();
if (currentUsage < overloadThreshold) {
if (log.isDebugEnabled()) {
log.debug("[{}] Broker is not overloaded, ignoring at this point ({})", broker,
localData.printResourceUsage());
}
return;
}
// We want to offload enough traffic such that this broker will go below the overload threshold
// Also, add a small margin so that this broker won't be very close to the threshold edge.
double percentOfTrafficToOffload = currentUsage - overloadThreshold + ADDITIONAL_THRESHOLD_PERCENT_MARGIN;
double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
double minimumThroughputToOffload = brokerCurrentThroughput * percentOfTrafficToOffload;
log.info(
"Attempting to shed load on {}, which has resource usage {}% above threshold {}%"
+ " -- Offloading at least {} MByte/s of traffic ({})",
broker, 100 * currentUsage, 100 * overloadThreshold, minimumThroughputToOffload / 1024 / 1024,
localData.printResourceUsage());
MutableDouble trafficMarkedToOffload = new MutableDouble(0);
MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);
if (localData.getBundles().size() > 1) {
// Sort bundles by throughput, then pick the biggest N which combined
// make up for at least the minimum throughput to offload
loadData.getBundleDataForLoadShedding().entrySet().stream()
.filter(e -> localData.getBundles().contains(e.getKey()))
.map((e) -> {
// Map to throughput value
// Consider short-term byte rate to address system resource burden
String bundle = e.getKey();
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double throughput = shortTermData.getMsgThroughputIn() + shortTermData
.getMsgThroughputOut();
return Pair.of(bundle, throughput);
}).filter(e -> {
// Only consider bundles that were not already unloaded recently
return !recentlyUnloadedBundles.containsKey(e.getLeft());
}).sorted((e1, e2) -> {
// Sort by throughput in reverse order
return Double.compare(e2.getRight(), e1.getRight());
}).forEach(e -> {
if (trafficMarkedToOffload.doubleValue() < minimumThroughputToOffload
|| atLeastOneBundleSelected.isFalse()) {
selectedBundlesCache.put(broker, e.getLeft());
trafficMarkedToOffload.add(e.getRight());
atLeastOneBundleSelected.setTrue();
}
});
} else if (localData.getBundles().size() == 1) {
log.warn(
"HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. "
+ "No Load Shedding will be done on this broker",
localData.getBundles().iterator().next(), broker);
} else {
log.warn("Broker {} is overloaded despite having no bundles", broker);
}
});
return selectedBundlesCache;
}
}