blob: 84c7947af0ba3ba7f9e1930ae1e6da090a8ae335 [file] [log] [blame]
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package scheduler
import (
type HealthChecker struct {
context *ClusterContext
confWatcherId string
// mutable values require locking
stopChan *chan struct{}
period time.Duration
enabled bool
func NewHealthChecker(schedulerContext *ClusterContext) *HealthChecker {
checker := &HealthChecker{
context: schedulerContext,
checker.confWatcherId = fmt.Sprintf("health-checker-%p", checker)
return checker
func (c *HealthChecker) GetPeriod() time.Duration {
defer c.RUnlock()
return c.period
func (c *HealthChecker) IsEnabled() bool {
defer c.RUnlock()
return c.enabled
func (c *HealthChecker) readPeriod() time.Duration {
value, ok := configs.GetConfigMap()[configs.HealthCheckInterval]
if !ok {
return configs.DefaultHealthCheckInterval
result, err := time.ParseDuration(value)
if err != nil {
log.Log(log.SchedHealth).Warn("Failed to parse configuration value",
zap.String("key", configs.HealthCheckInterval),
zap.String("value", value),
return configs.DefaultHealthCheckInterval
if result < 0 {
result = 0
return result
// Start executes healthCheck service in the background
func (c *HealthChecker) Start() {
func (c *HealthChecker) startInternal(runImmediately bool) {
if runImmediately {
defer c.Unlock()
configs.AddConfigMapCallback(c.confWatcherId, func() {
go c.reloadConfig()
period := c.readPeriod()
if period > 0 {
stopChan := make(chan struct{})
c.stopChan = &stopChan
c.period = period
c.enabled = true
log.Log(log.SchedHealth).Info("Starting periodic health checker", zap.Duration("interval", period))
go func() {
ticker := time.NewTicker(period)
for {
select {
case <-stopChan:
case <-ticker.C:
} else {
// disabled
c.stopChan = nil
c.period = 0
c.enabled = false
log.Log(log.SchedHealth).Info("Periodic health checker disabled")
func (c *HealthChecker) Stop() {
defer c.Unlock()
if c.stopChan != nil {
log.Log(log.SchedHealth).Info("Stopping periodic health checker")
*c.stopChan <- struct{}{}
c.stopChan = nil
func (c *HealthChecker) Restart() {
func (c *HealthChecker) reloadConfig() {
if c.isRestartNeeded() {
func (c *HealthChecker) isRestartNeeded() bool {
defer c.Unlock()
period := c.readPeriod()
return period != c.period
func (c *HealthChecker) runOnce() {
schedulerMetrics := metrics.GetSchedulerMetrics()
result := GetSchedulerHealthStatus(schedulerMetrics, c.context)
updateSchedulerLastHealthStatus(&result, c.context)
if !result.Healthy {
for _, v := range result.HealthChecks {
if v.Succeeded {
log.Log(log.SchedHealth).Warn("Scheduler is not healthy",
zap.String("name", v.Name),
zap.String("description", v.Description),
zap.String("message", v.DiagnosisMessage))
} else {
log.Log(log.SchedHealth).Debug("Scheduler is healthy")
func updateSchedulerLastHealthStatus(latest *dao.SchedulerHealthDAOInfo, schedulerContext *ClusterContext) {
func GetSchedulerHealthStatus(metrics *metrics.SchedulerMetrics, schedulerContext *ClusterContext) dao.SchedulerHealthDAOInfo {
var healthInfo []dao.HealthCheckInfo
healthInfo = append(healthInfo, checkSchedulingErrors(metrics))
healthInfo = append(healthInfo, checkFailedNodes(metrics))
healthInfo = append(healthInfo, checkSchedulingContext(schedulerContext)...)
healthy := true
for _, h := range healthInfo {
if !h.Succeeded {
healthy = false
return dao.SchedulerHealthDAOInfo{
Healthy: healthy,
HealthChecks: healthInfo,
func CreateCheckInfo(succeeded bool, name, description, message string) dao.HealthCheckInfo {
return dao.HealthCheckInfo{
Name: name,
Succeeded: succeeded,
Description: description,
DiagnosisMessage: message,
func checkSchedulingErrors(metrics *metrics.SchedulerMetrics) dao.HealthCheckInfo {
schedulingErrors, err := metrics.GetSchedulingErrors()
if err != nil {
return CreateCheckInfo(false, "Scheduling errors", "Check for scheduling error entries in metrics", err.Error())
diagnosisMsg := fmt.Sprintf("There were %v scheduling errors logged in the metrics", schedulingErrors)
return CreateCheckInfo(schedulingErrors == 0, "Scheduling errors", "Check for scheduling error entries in metrics", diagnosisMsg)
func checkFailedNodes(metrics *metrics.SchedulerMetrics) dao.HealthCheckInfo {
failedNodes, err := metrics.GetFailedNodes()
if err != nil {
return CreateCheckInfo(false, "Failed nodes", "Check for failed nodes entries in metrics", err.Error())
diagnosisMsg := fmt.Sprintf("There were %v failed nodes logged in the metrics", failedNodes)
return CreateCheckInfo(failedNodes == 0, "Failed nodes", "Check for failed nodes entries in metrics", diagnosisMsg)
func checkSchedulingContext(schedulerContext *ClusterContext) []dao.HealthCheckInfo {
// 1. check resources
// 1.1 check for negative resources
var partitionsWithNegResources []string
var nodesWithNegResources []string
// 1.3 node allocated resource <= total resource of the node
var allocationMismatch []string
// 1.2 total partition resource = sum of node resources
var totalResourceMismatch []string
// 1.4 node total resource = allocated resource + occupied resource + available resource
var nodeTotalMismatch []string
// 1.5 node capacity >= allocated resources on the node
var nodeCapacityMismatch []string
// 2. check reservation/node ration
var partitionReservationRatio []float32
// 3. check for orphan allocations
orphanAllocationsOnNode := make([]*objects.Allocation, 0)
orphanAllocationsOnApp := make([]*objects.Allocation, 0)
for _, part := range schedulerContext.GetPartitionMapClone() {
if part.GetAllocatedResource().HasNegativeValue() {
partitionsWithNegResources = append(partitionsWithNegResources, part.Name)
if part.GetTotalPartitionResource().HasNegativeValue() {
partitionsWithNegResources = append(partitionsWithNegResources, part.Name)
sumNodeResources := resources.NewResource()
sumNodeAllocatedResources := resources.NewResource()
sumReservation := 0
for _, node := range part.GetNodes() {
sumReservation += len(node.GetReservationKeys())
calculatedTotalNodeRes := resources.Add(node.GetAllocatedResource(), node.GetOccupiedResource())
if !resources.Equals(node.GetCapacity(), calculatedTotalNodeRes) {
nodeTotalMismatch = append(nodeTotalMismatch, node.NodeID)
if node.GetAllocatedResource().HasNegativeValue() {
nodesWithNegResources = append(nodesWithNegResources, node.NodeID)
if node.GetAvailableResource().HasNegativeValue() {
nodesWithNegResources = append(nodesWithNegResources, node.NodeID)
if node.GetCapacity().HasNegativeValue() {
nodesWithNegResources = append(nodesWithNegResources, node.NodeID)
if node.GetOccupiedResource().HasNegativeValue() {
nodesWithNegResources = append(nodesWithNegResources, node.NodeID)
if !resources.StrictlyGreaterThanOrEquals(node.GetCapacity(), node.GetAllocatedResource()) {
nodeCapacityMismatch = append(nodeCapacityMismatch, node.NodeID)
orphanAllocationsOnNode = append(orphanAllocationsOnNode, checkNodeAllocations(node, part)...)
// check if there are allocations assigned to an app but there are missing from the nodes
for _, app := range part.GetApplications() {
orphanAllocationsOnApp = append(orphanAllocationsOnApp, checkAppAllocations(app, part.nodes)...)
partitionReservationRatio = append(partitionReservationRatio, float32(sumReservation)/(float32(part.GetTotalNodeCount())))
if !resources.Equals(sumNodeAllocatedResources, part.GetAllocatedResource()) {
allocationMismatch = append(allocationMismatch, part.Name)
if !resources.EqualsOrEmpty(sumNodeResources, part.GetTotalPartitionResource()) {
totalResourceMismatch = append(totalResourceMismatch, part.Name)
var info = make([]dao.HealthCheckInfo, 9)
info[0] = CreateCheckInfo(len(partitionsWithNegResources) == 0, "Negative resources",
"Check for negative resources in the partitions",
fmt.Sprintf("Partitions with negative resources: %q", partitionsWithNegResources))
info[1] = CreateCheckInfo(len(nodesWithNegResources) == 0, "Negative resources",
"Check for negative resources in the nodes",
fmt.Sprintf("Nodes with negative resources: %q", nodesWithNegResources))
info[2] = CreateCheckInfo(len(allocationMismatch) == 0, "Consistency of data",
"Check if a partition's allocated resource <= total resource of the partition",
fmt.Sprintf("Partitions with inconsistent data: %q", allocationMismatch))
info[3] = CreateCheckInfo(len(totalResourceMismatch) == 0, "Consistency of data",
"Check if total partition resource == sum of the node resources from the partition",
fmt.Sprintf("Partitions with inconsistent data: %q", totalResourceMismatch))
info[4] = CreateCheckInfo(len(nodeTotalMismatch) == 0, "Consistency of data",
"Check if node total resource = allocated resource + occupied resource + available resource",
fmt.Sprintf("Nodes with inconsistent data: %q", nodeTotalMismatch))
info[5] = CreateCheckInfo(len(nodeCapacityMismatch) == 0, "Consistency of data",
"Check if node capacity >= allocated resources on the node",
fmt.Sprintf("Nodes with inconsistent data: %q", nodeCapacityMismatch))
// mark it as succeeded for a while until we will know what is not considered a normal value anymore
info[6] = CreateCheckInfo(true, "Reservation check",
"Check the reservation nr compared to the number of nodes",
fmt.Sprintf("Reservation/node nr ratio: %f", partitionReservationRatio))
info[7] = CreateCheckInfo(len(orphanAllocationsOnNode) == 0, "Orphan allocation on node check",
"Check if there are orphan allocations on the nodes",
fmt.Sprintf("Orphan allocations: %v", orphanAllocationsOnNode))
info[8] = CreateCheckInfo(len(orphanAllocationsOnApp) == 0, "Orphan allocation on app check",
"Check if there are orphan allocations on the applications",
fmt.Sprintf("OrphanAllocations: %v", orphanAllocationsOnApp))
return info
func checkAppAllocations(app *objects.Application, nodes objects.NodeCollection) []*objects.Allocation {
orphanAllocationsOnApp := make([]*objects.Allocation, 0)
for _, alloc := range app.GetAllAllocations() {
if node := nodes.GetNode(alloc.GetNodeID()); node != nil {
if node.GetAllocation(alloc.GetAllocationKey()) == nil {
orphanAllocationsOnApp = append(orphanAllocationsOnApp, alloc)
} else {
orphanAllocationsOnApp = append(orphanAllocationsOnApp, alloc)
return orphanAllocationsOnApp
func checkNodeAllocations(node *objects.Node, partitionContext *PartitionContext) []*objects.Allocation {
orphanAllocationsOnNode := make([]*objects.Allocation, 0)
for _, alloc := range node.GetAllAllocations() {
if app := partitionContext.getApplication(alloc.GetApplicationID()); app != nil {
if !app.IsAllocationAssignedToApp(alloc) {
orphanAllocationsOnNode = append(orphanAllocationsOnNode, alloc)
} else {
orphanAllocationsOnNode = append(orphanAllocationsOnNode, alloc)
return orphanAllocationsOnNode