blob: 2f5c32197c1fd1c8eedc9a01540c0f70dd76b90a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.models;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
/**
* Defines the information of top k highest-loaded bundles.
*/
@Getter
@ToString
@EqualsAndHashCode
@Slf4j
public class TopKBundles {
// temp array for sorting
private final List<Map.Entry<String, ? extends Comparable>> arr = new ArrayList<>();
private final TopBundlesLoadData loadData = new TopBundlesLoadData();
private final PulsarService pulsar;
private final SimpleResourceAllocationPolicies allocationPolicies;
public TopKBundles(PulsarService pulsar) {
this.pulsar = pulsar;
this.allocationPolicies = new SimpleResourceAllocationPolicies(pulsar);
}
/**
* Update the topK bundles from the input bundleStats.
*
* @param bundleStats bundle stats.
* @param topk top k bundle stats to select.
*/
public void update(Map<String, NamespaceBundleStats> bundleStats, int topk) {
arr.clear();
try {
var isLoadBalancerSheddingBundlesWithPoliciesEnabled =
pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
for (var etr : bundleStats.entrySet()) {
String bundle = etr.getKey();
if (bundle.startsWith(NamespaceName.SYSTEM_NAMESPACE.toString())) {
continue;
}
if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled && hasPolicies(bundle)) {
continue;
}
arr.add(etr);
}
var topKBundlesLoadData = loadData.getTopBundlesLoadData();
topKBundlesLoadData.clear();
if (arr.isEmpty()) {
return;
}
topk = Math.min(topk, arr.size());
partitionSort(arr, topk);
for (int i = topk - 1; i >= 0; i--) {
var etr = arr.get(i);
topKBundlesLoadData.add(
new TopBundlesLoadData.BundleLoadData(etr.getKey(), (NamespaceBundleStats) etr.getValue()));
}
} finally {
arr.clear();
}
}
static void partitionSort(List<Map.Entry<String, ? extends Comparable>> arr, int k) {
int start = 0;
int end = arr.size() - 1;
int target = k - 1;
while (start < end) {
int lo = start;
int hi = end;
int mid = lo;
var pivot = arr.get(hi).getValue();
while (mid <= hi) {
int cmp = pivot.compareTo(arr.get(mid).getValue());
if (cmp < 0) {
var tmp = arr.get(lo);
arr.set(lo++, arr.get(mid));
arr.set(mid++, tmp);
} else if (cmp > 0) {
var tmp = arr.get(mid);
arr.set(mid, arr.get(hi));
arr.set(hi--, tmp);
} else {
mid++;
}
}
if (lo <= target && target < mid) {
end = lo;
break;
}
if (target < lo) {
end = lo - 1;
} else {
start = mid;
}
}
Collections.sort(arr.subList(0, end), (a, b) -> b.getValue().compareTo(a.getValue()));
}
private boolean hasPolicies(String bundle) {
NamespaceName namespace = NamespaceName.get(LoadManagerShared.getNamespaceNameFromBundleName(bundle));
if (allocationPolicies.areIsolationPoliciesPresent(namespace)) {
return true;
}
try {
var antiAffinityGroupOptional =
LoadManagerShared.getNamespaceAntiAffinityGroup(pulsar, namespace.toString());
if (antiAffinityGroupOptional.isPresent()) {
return true;
}
} catch (MetadataStoreException e) {
log.error("Failed to get localPolicies for bundle:{}.", bundle, e);
throw new RuntimeException(e);
}
return false;
}
}