blob: efac8fa133de233ebf9a2834494de4f6bbf48b85 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package objects
import (
"sort"
"sync"
"time"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/plugins"
"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
var (
preemptAttemptFrequency = 15 * time.Second
preemptCheckConcurrency = 10
scoreFitMax uint64 = 1 << 32
scoreOriginator uint64 = 1 << 33
scoreNoPreempt uint64 = 1 << 34
scoreUnfit uint64 = 1 << 35
)
// Preemptor encapsulates the functionality required for preemption victim selection
type Preemptor struct {
application *Application // application containing ask
queue *Queue // queue to preempt for
queuePath string // path of queue to preempt for
headRoom *resources.Resource // current queue headroom
preemptionDelay time.Duration // preemption delay
ask *AllocationAsk // ask to be preempted for
iterator NodeIterator // iterator to enumerate all nodes
nodesTried bool // flag indicating that scheduling has already been tried on all nodes
// lazily-populated work structures
allocationsByQueue map[string]*QueuePreemptionSnapshot // map of queue snapshots by queue path
queueByAlloc map[string]*QueuePreemptionSnapshot // map of queue snapshots by allocationKey
allocationsByNode map[string][]*Allocation // map of allocation by nodeID
nodeAvailableMap map[string]*resources.Resource // map of available resources by nodeID
}
// QueuePreemptionSnapshot is used to track a snapshot of a queue for preemption
type QueuePreemptionSnapshot struct {
Parent *QueuePreemptionSnapshot // snapshot of parent queue
QueuePath string // fully qualified path to queue
Leaf bool // true if queue is a leaf queue
AllocatedResource *resources.Resource // allocated resources
PreemptingResource *resources.Resource // resources currently flagged for preemption
MaxResource *resources.Resource // maximum resources for this queue
GuaranteedResource *resources.Resource // guaranteed resources for this queue
PotentialVictims []*Allocation // list of allocations which could be preempted
}
// NewPreemptor creates a new preemptor. The preemptor itself is not thread safe, and assumes the application lock is held.
func NewPreemptor(application *Application, headRoom *resources.Resource, preemptionDelay time.Duration, ask *AllocationAsk, iterator NodeIterator, nodesTried bool) *Preemptor {
return &Preemptor{
application: application,
queue: application.queue,
queuePath: application.queuePath,
headRoom: headRoom,
preemptionDelay: preemptionDelay,
ask: ask,
iterator: iterator,
nodesTried: nodesTried,
}
}
// CheckPreconditions performs simple sanity checks designed to determine if preemption should be attempted
// for an ask. If checks succeed, updates the ask preemption check time.
func (p *Preemptor) CheckPreconditions() bool {
now := time.Now()
// skip if ask is not allowed to preempt other tasks
if !p.ask.IsAllowPreemptOther() {
return false
}
// skip if ask has previously triggered preemption
if p.ask.HasTriggeredPreemption() {
return false
}
// skip if ask requires a specific node (this should be handled by required node preemption algorithm)
if p.ask.GetRequiredNode() != "" {
return false
}
// skip if preemption delay has not yet passed
if now.Before(p.ask.GetCreateTime().Add(p.preemptionDelay)) {
return false
}
// skip if attempt frequency hasn't been reached again
if now.Before(p.ask.GetPreemptCheckTime().Add(preemptAttemptFrequency)) {
return false
}
// mark this ask as having been checked recently to avoid doing extra work in the next scheduling cycle
p.ask.UpdatePreemptCheckTime()
return true
}
// initQueueSnapshots ensures that snapshots have been taken of the queue
func (p *Preemptor) initQueueSnapshots() {
if p.allocationsByQueue != nil {
return
}
p.allocationsByQueue = p.queue.FindEligiblePreemptionVictims(p.queuePath, p.ask)
}
// initWorkingState builds helper data structures required to compute a solution
func (p *Preemptor) initWorkingState() {
// return if we have already run
if p.nodeAvailableMap != nil {
return
}
// ensure queue snapshots are populated
p.initQueueSnapshots()
allocationsByNode := make(map[string][]*Allocation)
queueByAlloc := make(map[string]*QueuePreemptionSnapshot)
nodeAvailableMap := make(map[string]*resources.Resource)
// build a map from NodeID to allocation and from allocationKey to queue capacities
for _, victims := range p.allocationsByQueue {
for _, allocation := range victims.PotentialVictims {
nodeID := allocation.GetNodeID()
allocations, ok := allocationsByNode[nodeID]
if !ok {
allocations = make([]*Allocation, 0)
}
allocationsByNode[nodeID] = append(allocations, allocation)
queueByAlloc[allocation.GetAllocationKey()] = victims
}
}
// walk node iterator and track available resources per node
p.iterator.ForEachNode(func(node *Node) bool {
if !node.IsSchedulable() || (node.IsReserved() && !node.isReservedForApp(reservationKey(nil, p.application, p.ask))) {
// node is not available, remove any potential victims from consideration
delete(allocationsByNode, node.NodeID)
} else {
// track allocated and available resources
nodeAvailableMap[node.NodeID] = node.GetAvailableResource()
}
return true
})
// sort the allocations on each node in the order we'd like to try them
sortVictimsForPreemption(allocationsByNode)
p.allocationsByNode = allocationsByNode
p.queueByAlloc = queueByAlloc
p.nodeAvailableMap = nodeAvailableMap
}
// checkPreemptionQueueGuarantees verifies that it's possible to free enough resources to fit the given ask
func (p *Preemptor) checkPreemptionQueueGuarantees() bool {
p.initQueueSnapshots()
queues := p.duplicateQueueSnapshots()
currentQueue, ok := queues[p.queuePath]
if !ok {
log.Log(log.SchedPreemption).Warn("BUG: Didn't find current queue in snapshot list",
zap.String("queuePath", p.queuePath))
return false
}
currentQueue.AddAllocation(p.ask.GetAllocatedResource())
// remove each allocation in turn, validating that at some point we free enough resources to allow this ask to fit
for _, snapshot := range queues {
for _, alloc := range snapshot.PotentialVictims {
snapshot.RemoveAllocation(alloc.GetAllocatedResource())
if currentQueue.IsWithinGuaranteedResource() {
return true
}
}
}
return false
}
// calculateVictimsByNode takes a list of potential victims for a node and builds a list ready for the RM to process.
// Result is a list of allocations and the starting index to check for the initial preemption list.
// If the result is nil, the node should not be considered for preemption.
func (p *Preemptor) calculateVictimsByNode(nodeAvailable *resources.Resource, potentialVictims []*Allocation) (int, []*Allocation) {
nodeCurrentAvailable := nodeAvailable.Clone()
allocationsByQueueSnap := p.duplicateQueueSnapshots()
// Initial check: Will allocation fit on node without preemption? This is possible if preemption was triggered due
// to queue limits and not node resource limits.
if nodeCurrentAvailable.FitIn(p.ask.GetAllocatedResource()) {
// return empty list so this node is considered for preemption
return -1, make([]*Allocation, 0)
}
// get the current queue snapshot
askQueue, ok := allocationsByQueueSnap[p.queuePath]
if !ok {
log.Log(log.SchedPreemption).Warn("BUG: Queue not found by name", zap.String("queuePath", p.queuePath))
return -1, nil
}
// speculatively add the current ask
askQueue.AddAllocation(p.ask.GetAllocatedResource())
// First pass: Check each task to see whether we are able to reduce our shortfall by preempting each
// task in turn, and filter out tasks which will cause their queue to drop below guaranteed capacity.
// If a task could be preempted without violating queue constraints, add it to either the 'head' list or the
// 'tail' list depending on whether the shortfall is reduced. If added to the 'head' list, adjust the node available
// capacity and the queue guaranteed headroom.
head := make([]*Allocation, 0)
tail := make([]*Allocation, 0)
for _, victim := range potentialVictims {
// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
// did removing this allocation still keep the queue over-allocated?
if queueSnapshot.IsAtOrAboveGuaranteedResource() {
// check to see if the shortfall on the node has changed
shortfall := resources.SubEliminateNegative(p.ask.GetAllocatedResource(), nodeCurrentAvailable)
newAvailable := resources.Add(nodeCurrentAvailable, victim.GetAllocatedResource())
newShortfall := resources.SubEliminateNegative(p.ask.GetAllocatedResource(), newAvailable)
if resources.EqualsOrEmpty(shortfall, newShortfall) {
// shortfall did not change, so task should only be considered as a last resort
queueSnapshot.AddAllocation(victim.GetAllocatedResource())
tail = append(tail, victim)
} else {
// shortfall was decreased, so we should keep this task on the main list and adjust usage
nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
head = append(head, victim)
}
} else {
// removing this allocation would have reduced queue below guaranteed limits, put it back
queueSnapshot.AddAllocation(victim.GetAllocatedResource())
}
}
}
}
// merge lists
head = append(head, tail...)
if len(head) == 0 {
return -1, nil
}
// clone again
nodeCurrentAvailable = nodeAvailable.Clone()
allocationsByQueueSnap = p.duplicateQueueSnapshots()
// get the current queue snapshot
askQueue, ok2 := allocationsByQueueSnap[p.queuePath]
if !ok2 {
log.Log(log.SchedPreemption).Warn("BUG: Queue not found by name", zap.String("queuePath", p.queuePath))
return -1, nil
}
// speculatively add the current ask
askQueue.AddAllocation(p.ask.GetAllocatedResource())
// Second pass: The task ordering can no longer change. For each task, check that queue constraints would not be
// violated if the task were to be preempted. If so, discard the task. If the task can be preempted, adjust
// both the node available capacity and the queue headroom. Save the Index within the results of the first task
// which would reduce the shortfall to zero.
results := make([]*Allocation, 0)
index := -1
for _, victim := range head {
// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
if queueSnapshot.IsAtOrAboveGuaranteedResource() {
// removing task does not violate queue constraints, adjust queue and node
nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
// check if ask now fits and we haven't had this happen before
if nodeCurrentAvailable.FitInMaxUndef(p.ask.GetAllocatedResource()) && index < 0 {
index = len(results)
}
// add victim to results
results = append(results, victim)
} else {
// add back resources
queueSnapshot.AddAllocation(victim.GetAllocatedResource())
}
}
}
}
// check to see if enough resources were freed
if index < 0 {
return -1, nil
}
return index, results
}
func (p *Preemptor) duplicateQueueSnapshots() map[string]*QueuePreemptionSnapshot {
cache := make(map[string]*QueuePreemptionSnapshot, 0)
for _, snapshot := range p.allocationsByQueue {
snapshot.Duplicate(cache)
}
return cache
}
// checkPreemptionPredicates calls the shim via the SI to evaluate nodes for preemption
func (p *Preemptor) checkPreemptionPredicates(predicateChecks []*si.PreemptionPredicatesArgs, victimsByNode map[string][]*Allocation) *predicateCheckResult {
// don't process empty list
if len(predicateChecks) == 0 {
return nil
}
// sort predicate checks by number of expected preempted tasks
sort.SliceStable(predicateChecks, func(i int, j int) bool {
// sort by NodeID if StartIndex are same
if predicateChecks[i].StartIndex == predicateChecks[j].StartIndex {
return predicateChecks[i].NodeID < predicateChecks[j].NodeID
}
return predicateChecks[i].StartIndex < predicateChecks[j].StartIndex
})
// check for RM callback
plugin := plugins.GetResourceManagerCallbackPlugin()
if plugin == nil {
// if a plugin isn't registered, assume checks will succeed and synthesize a result
check := predicateChecks[0]
log.Log(log.SchedPreemption).Debug("No RM callback plugin registered, using first selected node for preemption",
zap.String("NodeID", check.NodeID),
zap.String("AllocationKey", check.AllocationKey))
result := &predicateCheckResult{
allocationKey: check.AllocationKey,
nodeID: check.NodeID,
success: true,
index: int(check.StartIndex),
}
result.populateVictims(victimsByNode)
return result
}
// process each batch of checks by sending to the RM
batches := batchPreemptionChecks(predicateChecks, preemptCheckConcurrency)
var bestResult *predicateCheckResult = nil
for _, batch := range batches {
var wg sync.WaitGroup
ch := make(chan *predicateCheckResult, len(batch))
expected := 0
for _, args := range batch {
// add goroutine for checking preemption
wg.Add(1)
expected++
go preemptPredicateCheck(plugin, ch, &wg, args)
}
// wait for completion and close channel
go func() {
wg.Wait()
close(ch)
}()
for result := range ch {
// if result is successful, keep track of it
if result.success {
if bestResult == nil {
bestResult = result
} else if result.betterThan(bestResult, p.allocationsByNode) {
bestResult = result
}
}
}
// if the best result we have from this batch meets all our criteria, don't run another batch
if bestResult.isSatisfactory(p.allocationsByNode) {
break
}
}
bestResult.populateVictims(victimsByNode)
return bestResult
}
// calculateAdditionalVictims finds additional preemption victims necessary to ensure
func (p *Preemptor) calculateAdditionalVictims(nodeVictims []*Allocation) ([]*Allocation, bool) {
// clone the queue snapshots
allocationsByQueueSnap := p.duplicateQueueSnapshots()
// get the current queue snapshot
askQueue, ok := allocationsByQueueSnap[p.queuePath]
if !ok {
log.Log(log.SchedPreemption).Warn("BUG: Queue not found by name", zap.String("queuePath", p.queuePath))
return nil, false
}
// speculatively add the current ask
askQueue.AddAllocation(p.ask.GetAllocatedResource())
// remove all victims previously chosen for the node
seen := make(map[string]*Allocation, 0)
for _, victim := range nodeVictims {
if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
seen[victim.GetAllocationKey()] = victim
}
}
}
// build and sort list of potential victims
potentialVictims := make([]*Allocation, 0)
for _, alloc := range p.allocationsByQueue {
for _, victim := range alloc.PotentialVictims {
if _, ok := seen[victim.GetAllocationKey()]; ok {
// skip already processed victim
continue
}
potentialVictims = append(potentialVictims, victim)
}
}
sort.SliceStable(potentialVictims, func(i, j int) bool {
return compareAllocationLess(potentialVictims[i], potentialVictims[j])
})
// evaluate each potential victim in turn, stopping once sufficient resources have been freed
victims := make([]*Allocation, 0)
for _, victim := range potentialVictims {
// stop search if the ask fits into the queue
if askQueue.IsWithinGuaranteedResource() {
break
}
// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
remaining := askQueue.GetRemainingGuaranteed()
queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
// did removing this allocation still keep the queue over-allocated?
if queueSnapshot.IsAtOrAboveGuaranteedResource() {
// check to see if the shortfall on the queue has changed
newRemaining := askQueue.GetRemainingGuaranteed()
if resources.EqualsOrEmpty(remaining, newRemaining) {
// remaining guaranteed amount in ask queue did not change, so preempting task won't help
queueSnapshot.AddAllocation(victim.GetAllocatedResource())
} else {
// remaining capacity changed, so we should keep this task
victims = append(victims, victim)
}
} else {
// removing this allocation would have reduced queue below guaranteed limits, put it back
queueSnapshot.AddAllocation(victim.GetAllocatedResource())
}
}
}
}
if askQueue.IsWithinGuaranteedResource() {
return victims, true
}
return nil, false
}
// tryNodes attempts to find potential nodes for scheduling. For each node, potential victims are passed to
// the shim for evaluation, and the best solution found will be returned.
func (p *Preemptor) tryNodes() (string, []*Allocation, bool) {
// calculate victim list for each node
predicateChecks := make([]*si.PreemptionPredicatesArgs, 0)
victimsByNode := make(map[string][]*Allocation)
for nodeID, nodeAvailable := range p.nodeAvailableMap {
allocations, ok := p.allocationsByNode[nodeID]
if !ok {
// no allocations present, but node may still be available for scheduling
allocations = make([]*Allocation, 0)
}
// identify which victims and in which order should be tried
if idx, victims := p.calculateVictimsByNode(nodeAvailable, allocations); victims != nil {
victimsByNode[nodeID] = victims
keys := make([]string, 0)
for _, victim := range victims {
keys = append(keys, victim.GetAllocationKey())
}
// only check this node if there are victims or we have not already tried scheduling
if len(victims) > 0 || !p.nodesTried {
predicateChecks = append(predicateChecks, &si.PreemptionPredicatesArgs{
AllocationKey: p.ask.GetAllocationKey(),
NodeID: nodeID,
PreemptAllocationKeys: keys,
StartIndex: int32(idx),
})
}
}
}
// call predicates to evaluate each node
result := p.checkPreemptionPredicates(predicateChecks, victimsByNode)
if result != nil && result.success {
return result.nodeID, result.victims, true
}
return "", nil, false
}
func (p *Preemptor) TryPreemption() (*Allocation, bool) {
// validate that sufficient capacity can be freed
if !p.checkPreemptionQueueGuarantees() {
return nil, false
}
// ensure required data structures are populated
p.initWorkingState()
// try to find a node to schedule on and victims to preempt
nodeID, victims, ok := p.tryNodes()
if !ok {
// no preemption possible
return nil, false
}
// look for additional victims in case we have not yet made enough capacity in the queue
extraVictims, ok := p.calculateAdditionalVictims(victims)
if !ok {
// not enough resources were preempted
return nil, false
}
victims = append(victims, extraVictims...)
if len(victims) == 0 {
return nil, false
}
// preempt the victims
for _, victim := range victims {
if victimQueue := p.queue.FindQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
victimQueue.IncPreemptingResource(victim.GetAllocatedResource())
victim.MarkPreempted()
log.Log(log.SchedPreemption).Info("Preempting task",
zap.String("askApplicationID", p.ask.applicationID),
zap.String("askAllocationKey", p.ask.allocationKey),
zap.String("askQueue", p.queue.Name),
zap.String("victimApplicationID", victim.GetApplicationID()),
zap.String("victimAllocationKey", victim.GetAllocationKey()),
zap.Stringer("victimAllocatedResource", victim.GetAllocatedResource()),
zap.String("victimNodeID", victim.GetNodeID()),
zap.String("victimQueue", victimQueue.Name),
)
} else {
log.Log(log.SchedPreemption).Warn("BUG: Queue not found for preemption victim",
zap.String("queue", p.queue.Name),
zap.String("victimApplicationID", victim.GetApplicationID()),
zap.String("victimAllocationKey", victim.GetAllocationKey()))
}
}
// mark ask as having triggered preemption so that we don't preempt again
p.ask.MarkTriggeredPreemption()
// notify RM that victims should be released
p.application.notifyRMAllocationReleased(p.application.rmID, victims, si.TerminationType_PREEMPTED_BY_SCHEDULER,
"preempting allocations to free up resources to run ask: "+p.ask.GetAllocationKey())
// reserve the selected node for the new allocation if it will fit
log.Log(log.SchedPreemption).Info("Reserving node for ask after preemption",
zap.String("allocationKey", p.ask.GetAllocationKey()),
zap.String("nodeID", nodeID),
zap.Int("victimCount", len(victims)))
return newReservedAllocation(nodeID, p.ask), true
}
type predicateCheckResult struct {
allocationKey string
nodeID string
success bool
index int
victims []*Allocation
}
func (pcr *predicateCheckResult) betterThan(other *predicateCheckResult, allocationsByNode map[string][]*Allocation) bool {
return pcr.getSolutionScore(allocationsByNode) < other.getSolutionScore(allocationsByNode)
}
func (pcr *predicateCheckResult) getSolutionScore(allocationsByNode map[string][]*Allocation) uint64 {
if pcr == nil || !pcr.success {
return scoreUnfit
}
allocations, ok := allocationsByNode[pcr.nodeID]
if !ok {
return scoreUnfit
}
var score uint64 = 0
if pcr.index < 0 {
return score
}
if pcr.index >= len(allocations) {
// shouldn't happen
return scoreUnfit
}
for i := 0; i <= pcr.index; i++ {
allocation := allocations[i]
if allocation.GetAsk().IsOriginator() {
score |= scoreOriginator
}
if !allocation.GetAsk().IsAllowPreemptSelf() {
score |= scoreNoPreempt
}
}
score += uint64(pcr.index) + 1 // need to add 1 to differentiate between no preemption and preempt 1 container
return score
}
func (pcr *predicateCheckResult) isSatisfactory(allocationsByNode map[string][]*Allocation) bool {
return pcr.getSolutionScore(allocationsByNode) < scoreFitMax
}
func (pcr *predicateCheckResult) populateVictims(victimsByNode map[string][]*Allocation) {
if pcr == nil {
return
}
pcr.victims = nil
if !pcr.success {
return
}
// abort if node was not found
victimList, ok := victimsByNode[pcr.nodeID]
if !ok {
log.Log(log.SchedPreemption).Warn("BUG: Unable to find node in victim map", zap.String("nodeID", pcr.nodeID))
pcr.success = false
pcr.index = -1
return
}
// abort if index is too large
if pcr.index >= len(victimList) {
log.Log(log.SchedPreemption).Warn("BUG: Got invalid index into allocation list",
zap.String("nodeID", pcr.nodeID),
zap.Int("index", pcr.index),
zap.Int("length", len(victimList)))
pcr.success = false
pcr.index = -1
return
}
pcr.victims = make([]*Allocation, 0)
for i := 0; i <= pcr.index; i++ {
victim := victimList[i]
pcr.victims = append(pcr.victims, victim)
}
}
// Duplicate creates a copy of this snapshot into the given map by queue path
func (qps *QueuePreemptionSnapshot) Duplicate(copy map[string]*QueuePreemptionSnapshot) *QueuePreemptionSnapshot {
if qps == nil {
return nil
}
if existing, ok := copy[qps.QueuePath]; ok {
return existing
}
var parent *QueuePreemptionSnapshot = nil
if qps.Parent != nil {
qps.Parent.Duplicate(copy)
parent = qps.Parent.Duplicate(copy)
}
snapshot := &QueuePreemptionSnapshot{
Parent: parent,
QueuePath: qps.QueuePath,
Leaf: qps.Leaf,
AllocatedResource: qps.AllocatedResource.Clone(),
PreemptingResource: qps.PreemptingResource.Clone(),
MaxResource: qps.MaxResource.Clone(),
GuaranteedResource: qps.GuaranteedResource.Clone(),
PotentialVictims: qps.PotentialVictims,
}
copy[qps.QueuePath] = snapshot
return snapshot
}
// IsAtOrAboveGuaranteedResource determines if this queue is exceeding resource guarantees and therefore
// may be eligible for further preemption
func (qps *QueuePreemptionSnapshot) IsAtOrAboveGuaranteedResource() bool {
if qps == nil {
return false
}
guaranteed := qps.GetGuaranteedResource()
maxResource := qps.GetMaxResource()
absGuaranteed := resources.ComponentWiseMinPermissive(guaranteed, maxResource)
used := resources.Sub(qps.AllocatedResource, qps.PreemptingResource)
// if we don't fit, we're clearly above
if !absGuaranteed.FitIn(used) {
return true
}
usedOrMax := resources.ComponentWiseMax(guaranteed, used)
return resources.Equals(usedOrMax, used)
}
// IsWithinGuaranteedResource determines if this queue is within its current resource guarantees
func (qps *QueuePreemptionSnapshot) IsWithinGuaranteedResource() bool {
if qps == nil {
return true
}
// check the parent, as violations at any level mean we are not within limits
if !qps.Parent.IsWithinGuaranteedResource() {
return false
}
guaranteed := qps.GetGuaranteedResource()
// if this is a leaf queue and we have not found any guaranteed resources, then we are never within guaranteed usage
if qps.Leaf && guaranteed.IsEmpty() {
return false
}
maxResource := qps.GetMaxResource()
absGuaranteed := resources.ComponentWiseMinPermissive(guaranteed, maxResource)
used := resources.Sub(qps.AllocatedResource, qps.PreemptingResource)
return absGuaranteed.FitIn(used)
}
func (qps *QueuePreemptionSnapshot) GetRemainingGuaranteed() *resources.Resource {
if qps == nil {
return nil
}
parentResult := qps.Parent.GetRemainingGuaranteed()
if parentResult == nil {
parentResult = resources.NewResource()
}
guaranteed := qps.GetGuaranteedResource()
maxResource := qps.GetMaxResource()
absGuaranteed := resources.ComponentWiseMinPermissive(guaranteed, maxResource)
used := resources.Sub(qps.AllocatedResource, qps.PreemptingResource)
remaining := resources.Sub(absGuaranteed, used)
return resources.ComponentWiseMin(remaining, parentResult)
}
func (qps *QueuePreemptionSnapshot) GetPreemptableResource() *resources.Resource {
if qps == nil {
return nil
}
parentPreemptableResource := qps.Parent.GetPreemptableResource()
actual := qps.AllocatedResource.Clone()
// No usage, so nothing to preempt
if actual.IsEmpty() {
return nil
}
actual.SubOnlyExisting(qps.PreemptingResource)
// Calculate preemptable resource. +ve means Over utilized, -ve means Under utilized, 0 means correct utilization
guaranteed := qps.GuaranteedResource
actual.SubOnlyExisting(guaranteed)
preemptableResource := actual
// Keep only the resource type which needs to be preempted
for k, v := range actual.Resources {
// Under-utilized or completely used resource types
if v <= 0 {
delete(preemptableResource.Resources, k)
} else { // Over utilized resource types
preemptableResource.Resources[k] = v
}
}
// When nothing to preempt or usage equals guaranteed in current queue, return as is.
// Otherwise, doing min calculation with parent level (for a different res types) would lead to a wrong perception
// of choosing this current queue to select the victims when that is not the fact.
// As you move down the hierarchy, results calculated at lower level has higher precedence.
if preemptableResource.IsEmpty() {
return preemptableResource
}
return resources.ComponentWiseMinPermissive(preemptableResource, parentPreemptableResource)
}
func (qps *QueuePreemptionSnapshot) GetRemainingGuaranteedResource() *resources.Resource {
if qps == nil {
return nil
}
parent := qps.Parent.GetRemainingGuaranteedResource()
remainingGuaranteed := qps.GuaranteedResource.Clone()
// No Guaranteed set, so nothing remaining
// In case of guaranteed not set for queues at specific level, inherits the same from parent queue.
// If the parent too (or ancestors all the way upto root) doesn't have guaranteed set, then nil is returned.
// Otherwise, parent's guaranteed (or ancestors) would be used.
if parent.IsEmpty() && remainingGuaranteed.IsEmpty() {
return nil
}
used := qps.AllocatedResource.Clone()
used.SubOnlyExisting(qps.PreemptingResource)
remainingGuaranteed.SubOnlyExisting(used)
return resources.ComponentWiseMinPermissive(remainingGuaranteed, parent)
}
// GetGuaranteedResource computes the current guaranteed resources considering parent guaranteed
func (qps *QueuePreemptionSnapshot) GetGuaranteedResource() *resources.Resource {
if qps == nil {
return resources.NewResource()
}
return resources.ComponentWiseMinPermissive(qps.Parent.GetGuaranteedResource(), qps.GuaranteedResource)
}
// GetMaxResource computes the current max resources considering parent max
func (qps *QueuePreemptionSnapshot) GetMaxResource() *resources.Resource {
if qps == nil {
return resources.NewResource()
}
return resources.ComponentWiseMinPermissive(qps.Parent.GetMaxResource(), qps.MaxResource)
}
// AddAllocation adds an allocation to this snapshot's resource usage
func (qps *QueuePreemptionSnapshot) AddAllocation(alloc *resources.Resource) {
if qps == nil {
return
}
qps.Parent.AddAllocation(alloc)
qps.AllocatedResource.AddTo(alloc)
}
// RemoveAllocation removes an allocation from this snapshot's resource usage
func (qps *QueuePreemptionSnapshot) RemoveAllocation(alloc *resources.Resource) {
if qps == nil {
return
}
qps.Parent.RemoveAllocation(alloc)
qps.AllocatedResource.SubFrom(alloc)
}
// compareAllocationLess compares two allocations for preemption. Allocations which have opted into preemption are
// considered first, then allocations which are not the originator of their associated application. Ties are broken
// by creation time, with
// then
func compareAllocationLess(left *Allocation, right *Allocation) bool {
scoreLeft := scoreAllocation(left)
scoreRight := scoreAllocation(right)
if scoreLeft != scoreRight {
return scoreLeft < scoreRight
}
return left.createTime.After(right.createTime)
}
// scoreAllocation generates a relative score for an allocation. Lower-scored allocations are considered more likely
// preemption candidates. Tasks which have opted into preemption are considered first, then tasks which are not
// application originators.
func scoreAllocation(allocation *Allocation) uint64 {
var score uint64 = 0
if allocation.GetAsk().IsOriginator() {
score |= scoreOriginator
}
if !allocation.GetAsk().IsAllowPreemptSelf() {
score |= scoreNoPreempt
}
return score
}
// sortVictimsForPreemption sorts allocations on each node, preferring those that have opted-in to preemption,
// those that are not originating tasks for an application, and newest first
func sortVictimsForPreemption(allocationsByNode map[string][]*Allocation) {
for _, allocations := range allocationsByNode {
sort.SliceStable(allocations, func(i, j int) bool {
leftAsk := allocations[i].GetAsk()
rightAsk := allocations[j].GetAsk()
// sort asks which allow themselves to be preempted first
if leftAsk.IsAllowPreemptSelf() && !rightAsk.IsAllowPreemptSelf() {
return true
}
if rightAsk.IsAllowPreemptSelf() && !leftAsk.IsAllowPreemptSelf() {
return false
}
// next those that are not app originators
if leftAsk.IsOriginator() && !rightAsk.IsOriginator() {
return false
}
if rightAsk.IsOriginator() && !leftAsk.IsOriginator() {
return true
}
// finally sort by creation time descending
return leftAsk.GetCreateTime().After(rightAsk.GetCreateTime())
})
}
}
// preemptPredicateCheck performs a single predicate check and reports the result on a channel
func preemptPredicateCheck(plugin api.ResourceManagerCallback, ch chan<- *predicateCheckResult, wg *sync.WaitGroup, args *si.PreemptionPredicatesArgs) {
defer wg.Done()
result := &predicateCheckResult{
allocationKey: args.AllocationKey,
nodeID: args.NodeID,
success: false,
index: -1,
}
if len(args.PreemptAllocationKeys) == 0 {
// normal check; there are sufficient resources to run on this node
if err := plugin.Predicates(&si.PredicatesArgs{
AllocationKey: args.AllocationKey,
NodeID: args.NodeID,
Allocate: true,
}); err == nil {
result.success = true
result.index = -1
}
} else if response := plugin.PreemptionPredicates(args); response != nil {
// preemption check; at least one allocation will need preemption
result.success = response.GetSuccess()
if result.success {
result.index = int(response.GetIndex())
}
}
ch <- result
}
// batchPreemptionChecks splits predicate checks into groups by batch size
func batchPreemptionChecks(checks []*si.PreemptionPredicatesArgs, batchSize int) [][]*si.PreemptionPredicatesArgs {
var result [][]*si.PreemptionPredicatesArgs
for i := 0; i < len(checks); i += batchSize {
end := min(i+batchSize, len(checks))
result = append(result, checks[i:end])
}
return result
}