blob: 6ea167ef7a0c40ae93da1ad257e97e16fedfbab3 [file] [log] [blame]
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package objects
import (
const (
UnknownInstanceType = "UNKNOWN"
type Node struct {
// Fields for fast access These fields are considered read only.
// Values should only be set when creating a new node and never changed.
NodeID string
Hostname string
Rackname string
Partition string
// Private fields need protection
attributes map[string]string
totalResource *resources.Resource
occupiedResource *resources.Resource
allocatedResource *resources.Resource
availableResource *resources.Resource
allocations map[string]*Allocation
schedulable bool
reservations map[string]*reservation // a map of reservations
listeners []NodeListener // a list of node listeners
nodeEvents *nodeEvents
func NewNode(proto *si.NodeInfo) *Node {
// safeguard against panic
if proto == nil {
return nil
sn := &Node{
NodeID: proto.NodeID,
reservations: make(map[string]*reservation),
totalResource: resources.NewResourceFromProto(proto.SchedulableResource),
allocatedResource: resources.NewResource(),
occupiedResource: resources.NewResourceFromProto(proto.OccupiedResource),
allocations: make(map[string]*Allocation),
schedulable: true,
listeners: make([]NodeListener, 0),
sn.nodeEvents = newNodeEvents(sn, events.GetEventSystem())
// initialise available resources
var err error
sn.availableResource, err = resources.SubErrorNegative(sn.totalResource, sn.occupiedResource)
if err != nil {
log.Log(log.SchedNode).Error("New node created with no available resources",
return sn
func (sn *Node) String() string {
if sn == nil {
return "node is nil"
return fmt.Sprintf("NodeID %s, Partition %s, Schedulable %t, Total %s, Allocated %s, #allocations %d",
sn.NodeID, sn.Partition, sn.schedulable, sn.totalResource, sn.allocatedResource, len(sn.allocations))
// Set the attributes and fast access fields.
// Unlocked call: should only be called on create or from test code
func (sn *Node) initializeAttribute(newAttributes map[string]string) {
sn.attributes = newAttributes
// Avoid passing empty nodeAttributes in initializeAttribute
if len(sn.attributes) == 0 {
sn.attributes = map[string]string{}
sn.Hostname = sn.attributes[common.HostName]
sn.Rackname = sn.attributes[common.RackName]
sn.Partition = sn.attributes[common.NodePartition]
// Get an attribute by name. The most used attributes can be directly accessed via the
// fields: HostName, RackName and Partition.
// This is a lock free call. All attributes are considered read only
func (sn *Node) GetAttribute(key string) string {
return sn.attributes[key]
func (sn *Node) GetAttributes() map[string]string {
return sn.attributes
// Get InstanceType of this node.
// This is a lock free call because all attributes are considered read only
func (sn *Node) GetInstanceType() string {
itype := sn.GetAttribute(common.InstanceType)
if itype != "" {
return itype
return UnknownInstanceType
// GetReservationKeys Return an array of all reservation keys for the node.
// This will return an empty array if there are no reservations.
// Visible for tests
func (sn *Node) GetReservationKeys() []string {
defer sn.RUnlock()
keys := make([]string, 0)
for key := range sn.reservations {
keys = append(keys, key)
return keys
func (sn *Node) GetCapacity() *resources.Resource {
defer sn.RUnlock()
return sn.totalResource.Clone()
func (sn *Node) SetCapacity(newCapacity *resources.Resource) *resources.Resource {
defer sn.notifyListeners()
defer sn.Unlock()
if resources.Equals(sn.totalResource, newCapacity) {
log.Log(log.SchedNode).Debug("skip updating capacity, not changed")
return nil
delta := resources.Sub(newCapacity, sn.totalResource)
sn.totalResource = newCapacity
return delta
func (sn *Node) GetOccupiedResource() *resources.Resource {
defer sn.RUnlock()
return sn.occupiedResource.Clone()
func (sn *Node) SetOccupiedResource(occupiedResource *resources.Resource) {
defer sn.notifyListeners()
defer sn.Unlock()
if resources.Equals(sn.occupiedResource, occupiedResource) {
log.Log(log.SchedNode).Debug("skip updating occupiedResource, not changed")
sn.occupiedResource = occupiedResource
// refresh node available resource based on the latest total, allocated and occupied resources.
// this call assumes the caller already acquires the lock.
func (sn *Node) refreshAvailableResource() {
sn.availableResource = sn.totalResource.Clone()
// check if any quantity is negative: a nil resource is all 0's
if !resources.StrictlyGreaterThanOrEquals(sn.availableResource, nil) {
log.Log(log.SchedNode).Warn("Node update triggered over allocated node",
zap.Stringer("available", sn.availableResource),
zap.Stringer("total", sn.totalResource),
zap.Stringer("occupied", sn.occupiedResource),
zap.Stringer("allocated", sn.allocatedResource))
// Return the allocation based on the allocationKey of the allocation.
// returns nil if the allocation is not found
func (sn *Node) GetAllocation(allocationKey string) *Allocation {
defer sn.RUnlock()
return sn.allocations[allocationKey]
// Get a copy of the allocations on this node
func (sn *Node) GetAllAllocations() []*Allocation {
defer sn.RUnlock()
arr := make([]*Allocation, 0)
for _, v := range sn.allocations {
arr = append(arr, v)
return arr
// Set the node to unschedulable.
// This will cause the node to be skipped during the scheduling cycle.
// Visible for testing only
func (sn *Node) SetSchedulable(schedulable bool) {
defer sn.notifyListeners()
defer sn.Unlock()
sn.schedulable = schedulable
// Can this node be used in scheduling.
func (sn *Node) IsSchedulable() bool {
defer sn.RUnlock()
return sn.schedulable
// Get the allocated resource on this node.
func (sn *Node) GetAllocatedResource() *resources.Resource {
defer sn.RUnlock()
return sn.allocatedResource.Clone()
// Get the available resource on this node.
func (sn *Node) GetAvailableResource() *resources.Resource {
defer sn.Unlock()
return sn.availableResource.Clone()
// GetFitInScoreForAvailableResource calculates a fit in score for "res" based on the current
// available resources, avoiding cloning. The caller must ensure that "res" cannot change while this method is running.
func (sn *Node) GetFitInScoreForAvailableResource(res *resources.Resource) float64 {
defer sn.RUnlock()
return res.FitInScore(sn.availableResource)
// Get the utilized resource on this node.
func (sn *Node) GetUtilizedResource() *resources.Resource {
total := sn.GetCapacity()
resourceAllocated := sn.GetAllocatedResource()
utilizedResource := make(map[string]resources.Quantity)
for name := range resourceAllocated.Resources {
if total.Resources[name] > 0 {
utilizedResource[name] = resources.CalculateAbsUsedCapacity(total, resourceAllocated).Resources[name]
return &resources.Resource{Resources: utilizedResource}
// FitInNode checks if the request fits in the node.
// All resources types requested must match the resource types provided by the nodes.
// A request may ask for only a subset of the types, but the node must provide at least the
// resource types requested in a larger or equal quantity as requested.
func (sn *Node) FitInNode(resRequest *resources.Resource) bool {
defer sn.RUnlock()
return sn.totalResource.FitIn(resRequest)
// Remove the allocation to the node.
// Returns nil if the allocation was not found and no changes are made. If the allocation
// is found the Allocation removed is returned. Used resources will decrease available
// will increase as per the allocation removed.
func (sn *Node) RemoveAllocation(allocationKey string) *Allocation {
defer sn.notifyListeners()
defer sn.Unlock()
alloc := sn.allocations[allocationKey]
if alloc != nil {
delete(sn.allocations, allocationKey)
sn.nodeEvents.sendAllocationRemovedEvent(alloc.allocationKey, alloc.allocatedResource)
return alloc
return nil
// AddAllocation adds the allocation to the node. Used resources will increase available will decrease.
// A nil Allocation makes no changes. Preempted resources must have been released already.
// Do a sanity check to make sure it still fits in the node and nothing has changed
func (sn *Node) AddAllocation(alloc *Allocation) bool {
if alloc == nil {
return false
defer sn.notifyListeners()
defer sn.Unlock()
// check if this still fits: it might have changed since pre-check
res := alloc.GetAllocatedResource()
if sn.availableResource.FitIn(res) {
sn.allocations[alloc.GetAllocationKey()] = alloc
sn.nodeEvents.sendAllocationAddedEvent(alloc.allocationKey, res)
return true
return false
// ReplaceAllocation replaces the placeholder with the real allocation on the node.
// The delta passed in is the difference in resource usage between placeholder and real allocation.
// It should always be a negative value or zero: it is a decrease in usage or no change
func (sn *Node) ReplaceAllocation(allocationKey string, replace *Allocation, delta *resources.Resource) {
defer sn.notifyListeners()
defer sn.Unlock()
delete(sn.allocations, allocationKey)
sn.allocations[replace.GetAllocationKey()] = replace
before := sn.allocatedResource.Clone()
// The allocatedResource and availableResource should be updated in the same way
if !before.FitIn(sn.allocatedResource) {
log.Log(log.SchedNode).Warn("unexpected increase in node usage after placeholder replacement",
zap.String("placeholder allocationKey", allocationKey),
zap.String("allocation allocationKey", replace.GetAllocationKey()),
zap.Stringer("delta", delta))
// CanAllocate checks if the proposed allocation fits in the available resources.
// If the proposed allocation does not fit false is returned.
func (sn *Node) CanAllocate(res *resources.Resource) bool {
defer sn.RUnlock()
return sn.availableResource.FitIn(res)
// Checking pre-conditions in the shim for an allocation.
func (sn *Node) preAllocateConditions(ask *AllocationAsk) bool {
return sn.preConditions(ask, true)
// Checking pre-conditions in the shim for a reservation.
func (sn *Node) preReserveConditions(ask *AllocationAsk) bool {
return sn.preConditions(ask, false)
// The pre conditions are implemented via plugins in the shim. If no plugins are implemented then
// the check will return true. If multiple plugins are implemented the first failure will stop the
// checks.
// The caller must thus not rely on all plugins being executed.
// This is a lock free call as it does not change the node and multiple predicate checks could be
// run at the same time.
func (sn *Node) preConditions(ask *AllocationAsk, allocate bool) bool {
// Check the predicates plugin (k8shim)
allocationKey := ask.GetAllocationKey()
if plugin := plugins.GetResourceManagerCallbackPlugin(); plugin != nil {
// checking predicates
if err := plugin.Predicates(&si.PredicatesArgs{
AllocationKey: allocationKey,
NodeID: sn.NodeID,
Allocate: allocate,
}); err != nil {
log.Log(log.SchedNode).Debug("running predicates failed",
zap.String("allocationKey", allocationKey),
zap.String("nodeID", sn.NodeID),
zap.Bool("allocateFlag", allocate),
// running predicates failed
msg := err.Error()
ask.LogAllocationFailure(msg, allocate)
return false
// all predicate plugins passed
return true
// preAllocateCheck checks if the node should be considered as a possible node to allocate on.
// No updates are made this only performs a pre allocate checks
func (sn *Node) preAllocateCheck(res *resources.Resource, resKey string) bool {
// cannot allocate zero or negative resource
if !resources.StrictlyGreaterThanZero(res) {
log.Log(log.SchedNode).Debug("pre alloc check: requested resource is zero",
zap.String("nodeID", sn.NodeID))
return false
// check if the node is reserved for this app/alloc
if sn.IsReserved() {
if !sn.isReservedForApp(resKey) {
log.Log(log.SchedNode).Debug("pre alloc check: node reserved for different app or ask",
zap.String("nodeID", sn.NodeID),
zap.String("resKey", resKey))
return false
defer sn.RUnlock()
// returns true/false based on if the request fits in what we have calculated
return sn.availableResource.FitIn(res)
// Return if the node has been reserved by any application
func (sn *Node) IsReserved() bool {
defer sn.RUnlock()
return len(sn.reservations) > 0
// isReservedForApp returns true if and only if the node has been reserved by the application
// NOTE: a return value of false does not mean the node is not reserved by a different app
func (sn *Node) isReservedForApp(key string) bool {
if key == "" {
return false
defer sn.RUnlock()
if strings.Contains(key, "|") {
return sn.reservations[key] != nil
// make sure matches only for the whole appID
separator := key + "|"
for resKey := range sn.reservations {
if strings.HasPrefix(resKey, separator) {
return true
return false
// Reserve the node for this application and ask combination, if not reserved yet.
// The reservation is checked against the node resources.
// If the reservation fails the function returns false, if the reservation is made it returns true.
func (sn *Node) Reserve(app *Application, ask *AllocationAsk) error {
defer sn.notifyListeners()
defer sn.Unlock()
if len(sn.reservations) > 0 {
return fmt.Errorf("node is already reserved, nodeID %s", sn.NodeID)
appReservation := newReservation(sn, app, ask, false)
// this should really not happen just guard against panic
// either app or ask are nil
if appReservation == nil {
log.Log(log.SchedNode).Debug("reservation creation failed unexpectedly",
zap.String("nodeID", sn.NodeID),
zap.Any("app", app),
zap.Any("ask", ask))
return fmt.Errorf("reservation creation failed app or ask are nil on nodeID %s", sn.NodeID)
// reservation must fit on the empty node
if !sn.totalResource.FitIn(ask.GetAllocatedResource()) {
log.Log(log.SchedNode).Debug("reservation does not fit on the node",
zap.String("nodeID", sn.NodeID),
zap.String("appID", app.ApplicationID),
zap.String("ask", ask.GetAllocationKey()),
zap.Stringer("allocationAsk", ask.GetAllocatedResource()))
return fmt.Errorf("reservation does not fit on node %s, appID %s, ask %s", sn.NodeID, app.ApplicationID, ask.GetAllocatedResource().String())
sn.reservations[appReservation.getKey()] = appReservation
sn.nodeEvents.sendReservedEvent(ask.GetAllocatedResource(), ask.GetAllocationKey())
// reservation added successfully
return nil
// unReserve the node for this application and ask combination
// If the reservation does not exist it returns 0 for reservations removed, if the reservation is removed it returns 1.
// The error is set if the reservation key cannot be generated.
func (sn *Node) unReserve(app *Application, ask *AllocationAsk) (int, error) {
defer sn.notifyListeners()
defer sn.Unlock()
resKey := reservationKey(nil, app, ask)
if resKey == "" {
log.Log(log.SchedNode).Debug("unreserve reservation key create failed unexpectedly",
zap.String("nodeID", sn.NodeID),
zap.Any("app", app),
zap.Any("ask", ask))
return 0, fmt.Errorf("reservation key failed app or ask are nil on nodeID %s", sn.NodeID)
if _, ok := sn.reservations[resKey]; ok {
delete(sn.reservations, resKey)
sn.nodeEvents.sendUnreservedEvent(ask.GetAllocatedResource(), ask.GetAllocationKey())
return 1, nil
// reservation was not found
log.Log(log.SchedNode).Debug("reservation not found while removing from node",
zap.String("nodeID", sn.NodeID),
zap.String("appID", app.ApplicationID),
zap.String("ask", ask.GetAllocationKey()))
return 0, nil
// GetReservations returns all reservation made on this node
func (sn *Node) GetReservations() []*reservation {
defer sn.Unlock()
var res []*reservation
if len(sn.reservations) > 0 {
for _, r := range sn.reservations {
res = append(res, r)
return res
// GetResourceUsageShares gets a map of name -> resource usages per type in shares (0 to 1). Can return NaN.
func (sn *Node) GetResourceUsageShares() map[string]float64 {
defer sn.RUnlock()
res := make(map[string]float64)
if sn.totalResource == nil {
// no resources present, so no usage
return res
for k, v := range sn.totalResource.Resources {
res[k] = float64(1) - (float64(sn.availableResource.Resources[k]) / float64(v))
return res
func (sn *Node) AddListener(listener NodeListener) {
defer sn.Unlock()
sn.listeners = append(sn.listeners, listener)
func (sn *Node) RemoveListener(listener NodeListener) {
defer sn.Unlock()
newListeners := make([]NodeListener, 0)
for _, entry := range sn.listeners {
if entry == listener {
newListeners = append(newListeners, entry)
sn.listeners = newListeners
// Notifies listeners of changes to this node. This method must not be called while locks are held.
func (sn *Node) notifyListeners() {
for _, listener := range sn.getListeners() {
func (sn *Node) getListeners() []NodeListener {
defer sn.RUnlock()
list := make([]NodeListener, len(sn.listeners))
copy(list, sn.listeners)
return list
func (sn *Node) SendNodeAddedEvent() {
func (sn *Node) SendNodeRemovedEvent() {