blob: e388d8b561459f14c84b7d46ed0f12fa8ec2765d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.coordinator;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.util.Constants;
/**
* AccessClusterLoadChecker use the cluster load metrics including memory and healthy to
* filter and count available nodes numbers and reject if the number do not reach the threshold.
*/
public class AccessClusterLoadChecker extends AbstractAccessChecker {
private static final Logger LOG = LoggerFactory.getLogger(AccessClusterLoadChecker.class);
private final ClusterManager clusterManager;
private final double memoryPercentThreshold;
private final int availableServerNumThreshold;
public AccessClusterLoadChecker(AccessManager accessManager) throws Exception {
super(accessManager);
clusterManager = accessManager.getClusterManager();
CoordinatorConf conf = accessManager.getCoordinatorConf();
this.memoryPercentThreshold = conf.getDouble(CoordinatorConf.COORDINATOR_ACCESS_LOADCHECKER_MEMORY_PERCENTAGE);
this.availableServerNumThreshold = conf.getInteger(
CoordinatorConf.COORDINATOR_ACCESS_LOADCHECKER_SERVER_NUM_THRESHOLD,
conf.get(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX));
}
public AccessCheckResult check(AccessInfo accessInfo) {
Set<String> tags = accessInfo.getTags();
List<ServerNode> servers = clusterManager.getServerList(tags);
int size = (int) servers.stream().filter(ServerNode::isHealthy).filter(this::checkMemory).count();
if (size >= availableServerNumThreshold) {
return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE);
} else {
String msg = String.format("Denied by AccessClusterLoadChecker accessInfo[%s], "
+ "total %s nodes, %s available nodes, "
+ "memory percent threshold %s, available num threshold %s.",
accessInfo, servers.size(), size, memoryPercentThreshold, availableServerNumThreshold);
LOG.warn(msg);
CoordinatorMetrics.counterTotalLoadDeniedRequest.inc();
return new AccessCheckResult(false, msg);
}
}
private boolean checkMemory(ServerNode serverNode) {
double availableMemory = (double) serverNode.getAvailableMemory();
double total = (double) serverNode.getTotalMemory();
double availablePercent = availableMemory / (total / 100.0);
return Double.compare(availablePercent, memoryPercentThreshold) >= 0;
}
public ClusterManager getClusterManager() {
return clusterManager;
}
public double getMemoryPercentThreshold() {
return memoryPercentThreshold;
}
public int getAvailableServerNumThreshold() {
return availableServerNumThreshold;
}
public void close() {
}
}