blob: 5bc7bf952b1293f480eee535df0519f274baff2b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.impl;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.base.Charsets;
import com.sun.management.OperatingSystemMXBean;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
/**
* Class that will return the broker host usage.
*/
@Slf4j
public class LinuxBrokerHostUsageImpl implements BrokerHostUsage {
private long lastCollection;
private double lastTotalNicUsageTx;
private double lastTotalNicUsageRx;
private double lastCpuUsage;
private double lastCpuTotalTime;
private OperatingSystemMXBean systemBean;
private SystemResourceUsage usage;
private final Optional<Double> overrideBrokerNicSpeedGbps;
private final boolean isCGroupsEnabled;
private static final String CGROUPS_CPU_USAGE_PATH = "/sys/fs/cgroup/cpu/cpuacct.usage";
private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us";
private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_period_us";
public LinuxBrokerHostUsageImpl(PulsarService pulsar) {
this(
pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes(),
pulsar.getConfiguration().getLoadBalancerOverrideBrokerNicSpeedGbps(),
pulsar.getLoadManagerExecutor()
);
}
public LinuxBrokerHostUsageImpl(int hostUsageCheckIntervalMin,
Optional<Double> overrideBrokerNicSpeedGbps,
ScheduledExecutorService executorService) {
this.systemBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
this.lastCollection = 0L;
this.usage = new SystemResourceUsage();
this.overrideBrokerNicSpeedGbps = overrideBrokerNicSpeedGbps;
boolean isCGroupsEnabled = false;
try {
isCGroupsEnabled = Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
} catch (Exception e) {
log.warn("Failed to check cgroup CPU usage file: {}", e.getMessage());
}
this.isCGroupsEnabled = isCGroupsEnabled;
// Call now to initialize values before the constructor returns
calculateBrokerHostUsage();
executorService.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::calculateBrokerHostUsage),
hostUsageCheckIntervalMin,
hostUsageCheckIntervalMin, TimeUnit.MINUTES);
}
@Override
public SystemResourceUsage getBrokerHostUsage() {
return usage;
}
@Override
public void calculateBrokerHostUsage() {
List<String> nics = getNics();
double totalNicLimit = getTotalNicLimitKbps(nics);
double totalNicUsageTx = getTotalNicUsageTxKb(nics);
double totalNicUsageRx = getTotalNicUsageRxKb(nics);
double totalCpuLimit = getTotalCpuLimit();
long now = System.currentTimeMillis();
double elapsedSeconds = (now - lastCollection) / 1000d;
if (elapsedSeconds <= 0) {
log.warn("elapsedSeconds {} is not expected, skip this round of calculateBrokerHostUsage", elapsedSeconds);
return;
}
SystemResourceUsage usage = new SystemResourceUsage();
double cpuUsage = getTotalCpuUsage(elapsedSeconds);
if (lastCollection == 0L) {
usage.setMemory(getMemUsage());
usage.setBandwidthIn(new ResourceUsage(0d, totalNicLimit));
usage.setBandwidthOut(new ResourceUsage(0d, totalNicLimit));
} else {
double nicUsageTx = (totalNicUsageTx - lastTotalNicUsageTx) / elapsedSeconds;
double nicUsageRx = (totalNicUsageRx - lastTotalNicUsageRx) / elapsedSeconds;
usage.setMemory(getMemUsage());
usage.setBandwidthIn(new ResourceUsage(nicUsageRx, totalNicLimit));
usage.setBandwidthOut(new ResourceUsage(nicUsageTx, totalNicLimit));
}
lastTotalNicUsageTx = totalNicUsageTx;
lastTotalNicUsageRx = totalNicUsageRx;
lastCollection = System.currentTimeMillis();
this.usage = usage;
usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit));
}
private double getTotalCpuLimit() {
if (isCGroupsEnabled) {
try {
long quota = readLongFromFile(CGROUPS_CPU_LIMIT_QUOTA_PATH);
long period = readLongFromFile(CGROUPS_CPU_LIMIT_PERIOD_PATH);
if (quota > 0) {
return 100.0 * quota / period;
}
} catch (IOException e) {
log.warn("Failed to read CPU quotas from cgroups", e);
// Fallback to availableProcessors
}
}
// Fallback to JVM reported CPU quota
return 100 * Runtime.getRuntime().availableProcessors();
}
private double getTotalCpuUsage(double elapsedTimeSeconds) {
if (isCGroupsEnabled) {
return getTotalCpuUsageForCGroup(elapsedTimeSeconds);
} else {
return getTotalCpuUsageForEntireHost();
}
}
/**
* Reads first line of /proc/stat to get total cpu usage.
*
* <pre>
* cpu user nice system idle iowait irq softirq steal guest guest_nice
* cpu 317808 128 58637 2503692 7634 0 13472 0 0 0
* </pre>
*
* Line is split in "words", filtering the first. The sum of all numbers give the amount of cpu cycles used this
* far. Real CPU usage should equal the sum substracting the idle cycles, this would include iowait, irq and steal.
*/
private double getTotalCpuUsageForEntireHost() {
try (Stream<String> stream = Files.lines(Paths.get("/proc/stat"))) {
String[] words = stream.findFirst().get().split("\\s+");
long total = Arrays.stream(words).filter(s -> !s.contains("cpu")).mapToLong(Long::parseLong).sum();
long idle = Long.parseLong(words[4]);
long usage = total - idle;
double currentUsage = (usage - lastCpuUsage) / (total - lastCpuTotalTime) * getTotalCpuLimit();
lastCpuUsage = usage;
lastCpuTotalTime = total;
return currentUsage;
} catch (IOException e) {
log.error("Failed to read CPU usage from /proc/stat", e);
return -1;
}
}
private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) {
try {
long usage = readLongFromFile(CGROUPS_CPU_USAGE_PATH);
double currentUsage = usage - lastCpuUsage;
lastCpuUsage = usage;
return 100 * currentUsage / elapsedTimeSeconds / TimeUnit.SECONDS.toNanos(1);
} catch (IOException e) {
log.error("Failed to read CPU usage from {}", CGROUPS_CPU_USAGE_PATH, e);
return -1;
}
}
private ResourceUsage getMemUsage() {
double total = ((double) systemBean.getTotalPhysicalMemorySize()) / (1024 * 1024);
double free = ((double) systemBean.getFreePhysicalMemorySize()) / (1024 * 1024);
return new ResourceUsage(total - free, total);
}
private List<String> getNics() {
try (Stream<Path> stream = Files.list(Paths.get("/sys/class/net/"))) {
return stream.filter(this::isPhysicalNic).map(path -> path.getFileName().toString())
.collect(Collectors.toList());
} catch (IOException e) {
log.error("Failed to find NICs", e);
return Collections.emptyList();
}
}
public int getNicCount() {
return getNics().size();
}
private boolean isPhysicalNic(Path path) {
if (!path.toString().contains("/virtual/")) {
try {
Files.readAllBytes(path.resolve("speed"));
return true;
} catch (Exception e) {
// wireless nics don't report speed, ignore them.
return false;
}
}
return false;
}
private Path getNicSpeedPath(String nic) {
return Paths.get(String.format("/sys/class/net/%s/speed", nic));
}
private double getTotalNicLimitKbps(List<String> nics) {
// Use the override value as configured. Return the total max speed across all available NICs, converted
// from Gbps into Kbps
return overrideBrokerNicSpeedGbps.map(aDouble -> aDouble * nics.size() * 1024 * 1024)
.orElseGet(() -> nics.stream().mapToDouble(s -> {
// Nic speed is in Mbits/s, return kbits/s
try {
return Double.parseDouble(new String(Files.readAllBytes(getNicSpeedPath(s))));
} catch (IOException e) {
log.error("Failed to read speed for nic " + s, e);
return 0d;
}
}).sum() * 1024);
}
private Path getNicTxPath(String nic) {
return Paths.get(String.format("/sys/class/net/%s/statistics/tx_bytes", nic));
}
private Path getNicRxPath(String nic) {
return Paths.get(String.format("/sys/class/net/%s/statistics/rx_bytes", nic));
}
private double getTotalNicUsageRxKb(List<String> nics) {
return nics.stream().mapToDouble(s -> {
try {
return Double.parseDouble(new String(Files.readAllBytes(getNicRxPath(s))));
} catch (IOException e) {
log.error("Failed to read rx_bytes for NIC " + s, e);
return 0d;
}
}).sum() * 8 / 1024;
}
private double getTotalNicUsageTxKb(List<String> nics) {
return nics.stream().mapToDouble(s -> {
try {
return Double.parseDouble(new String(Files.readAllBytes(getNicTxPath(s))));
} catch (IOException e) {
log.error("Failed to read tx_bytes for NIC " + s, e);
return 0d;
}
}).sum() * 8 / 1024;
}
private static long readLongFromFile(String path) throws IOException {
return Long.parseLong(new String(Files.readAllBytes(Paths.get(path)), Charsets.UTF_8).trim());
}
}