| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package scheduler |
| |
| import ( |
| "context" |
| "fmt" |
| "math" |
| "strconv" |
| "strings" |
| "time" |
| |
| "github.com/looplab/fsm" |
| "go.uber.org/zap" |
| |
| "github.com/apache/yunikorn-core/pkg/common" |
| "github.com/apache/yunikorn-core/pkg/common/configs" |
| "github.com/apache/yunikorn-core/pkg/common/resources" |
| "github.com/apache/yunikorn-core/pkg/common/security" |
| "github.com/apache/yunikorn-core/pkg/locking" |
| "github.com/apache/yunikorn-core/pkg/log" |
| "github.com/apache/yunikorn-core/pkg/metrics" |
| "github.com/apache/yunikorn-core/pkg/scheduler/objects" |
| "github.com/apache/yunikorn-core/pkg/scheduler/placement" |
| "github.com/apache/yunikorn-core/pkg/scheduler/policies" |
| "github.com/apache/yunikorn-core/pkg/scheduler/ugm" |
| "github.com/apache/yunikorn-core/pkg/webservice/dao" |
| "github.com/apache/yunikorn-scheduler-interface/lib/go/si" |
| ) |
| |
| type PartitionContext struct { |
| RmID string // the RM the partition belongs to |
| Name string // name of the partition (logging mainly) |
| |
| // Private fields need protection |
| root *objects.Queue // start of the queue hierarchy |
| applications map[string]*objects.Application // applications assigned to this partition |
| completedApplications map[string]*objects.Application // completed applications from this partition |
| rejectedApplications map[string]*objects.Application // rejected applications from this partition |
| nodes objects.NodeCollection // nodes assigned to this partition |
| placementManager *placement.AppPlacementManager // placement manager for this partition |
| partitionManager *partitionManager // manager for this partition |
| stateMachine *fsm.FSM // the state of the partition for scheduling |
| stateTime time.Time // last time the state was updated (needed for cleanup) |
| userGroupCache *security.UserGroupCache // user cache per partition |
| totalPartitionResource *resources.Resource // Total node resources |
| allocations int // Number of allocations on the partition |
| reservations int // number of reservations |
| placeholderAllocations int // number of placeholder allocations |
| preemptionEnabled bool // whether preemption is enabled or not |
| |
| // The partition write lock must not be held while manipulating an application. |
| // Scheduling is running continuously as a lock free background task. Scheduling an application |
| // acquires a write lock of the application object. While holding the write lock a list of nodes is |
| // requested from the partition. This requires a read lock on the partition. |
| // If the partition write lock is held while manipulating an application a dead lock could occur. |
| // Since application objects handle their own locks there is no requirement to hold the partition lock |
| // while manipulating the application. |
| // Similarly adding, updating or removing a node or a queue should only hold the partition write lock |
| // while manipulating the partition information not while manipulating the underlying objects. |
| locking.RWMutex |
| } |
| |
| func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterContext) (*PartitionContext, error) { |
| if conf.Name == "" || rmID == "" { |
| log.Log(log.SchedPartition).Info("partition cannot be created", |
| zap.String("partition name", conf.Name), |
| zap.String("rmID", rmID), |
| zap.Any("cluster context", cc)) |
| return nil, fmt.Errorf("partition cannot be created without name or RM, one is not set") |
| } |
| pc := &PartitionContext{ |
| Name: conf.Name, |
| RmID: rmID, |
| stateMachine: objects.NewObjectState(), |
| stateTime: time.Now(), |
| applications: make(map[string]*objects.Application), |
| completedApplications: make(map[string]*objects.Application), |
| nodes: objects.NewNodeCollection(conf.Name), |
| } |
| pc.partitionManager = newPartitionManager(pc, cc) |
| if err := pc.initialPartitionFromConfig(conf); err != nil { |
| return nil, err |
| } |
| |
| return pc, nil |
| } |
| |
| // Initialise the partition |
| func (pc *PartitionContext) initialPartitionFromConfig(conf configs.PartitionConfig) error { |
| if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue { |
| return fmt.Errorf("partition cannot be created without root queue") |
| } |
| |
| // Setup the queue structure: root first it should be the only queue at this level |
| // Add the rest of the queue structure recursively |
| queueConf := conf.Queues[0] |
| var err error |
| if pc.root, err = objects.NewConfiguredQueue(queueConf, nil); err != nil { |
| return err |
| } |
| // recursively add the queues to the root |
| if err = pc.addQueue(queueConf.Queues, pc.root); err != nil { |
| return err |
| } |
| log.Log(log.SchedPartition).Info("root queue added", |
| zap.String("partitionName", pc.Name), |
| zap.String("rmID", pc.RmID)) |
| |
| // We need to pass in the locked version of the GetQueue function. |
| // Placing an application will not have a lock on the partition context. |
| pc.placementManager = placement.NewPlacementManager(conf.PlacementRules, pc.GetQueue) |
| // get the user group cache for the partition |
| // TODO get the resolver from the config |
| pc.userGroupCache = security.GetUserGroupCache("") |
| pc.updateNodeSortingPolicy(conf) |
| pc.updatePreemption(conf) |
| |
| // update limit settings: start at the root |
| return ugm.GetUserManager().UpdateConfig(queueConf, conf.Queues[0].Name) |
| } |
| |
| // NOTE: this is a lock free call. It should only be called holding the PartitionContext lock. |
| func (pc *PartitionContext) updateNodeSortingPolicy(conf configs.PartitionConfig) { |
| var configuredPolicy policies.SortingPolicy |
| configuredPolicy, err := policies.SortingPolicyFromString(conf.NodeSortPolicy.Type) |
| if err != nil { |
| log.Log(log.SchedPartition).Debug("NodeSorting policy incorrectly set or unknown", |
| zap.Error(err)) |
| log.Log(log.SchedPartition).Info(fmt.Sprintf("NodeSorting policy not set using '%s' as default", configuredPolicy)) |
| } else { |
| log.Log(log.SchedPartition).Info("NodeSorting policy set from config", |
| zap.Stringer("policyName", configuredPolicy)) |
| } |
| pc.nodes.SetNodeSortingPolicy(objects.NewNodeSortingPolicy(conf.NodeSortPolicy.Type, conf.NodeSortPolicy.ResourceWeights)) |
| } |
| |
| // NOTE: this is a lock free call. It should only be called holding the PartitionContext lock. |
| func (pc *PartitionContext) updatePreemption(conf configs.PartitionConfig) { |
| pc.preemptionEnabled = conf.Preemption.Enabled == nil || *conf.Preemption.Enabled |
| } |
| |
| func (pc *PartitionContext) updatePartitionDetails(conf configs.PartitionConfig) error { |
| // the following piece of code (before pc.Lock()) must be performed without locking |
| // to avoid lock order differences between PartitionContext and AppPlacementManager |
| if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue { |
| return fmt.Errorf("partition cannot be created without root queue") |
| } |
| log.Log(log.SchedPartition).Info("Updating placement manager rules on config reload") |
| err := pc.getPlacementManager().UpdateRules(conf.PlacementRules) |
| if err != nil { |
| log.Log(log.SchedPartition).Info("New placement rules not activated, config reload failed", zap.Error(err)) |
| return err |
| } |
| pc.updateNodeSortingPolicy(conf) |
| |
| pc.Lock() |
| defer pc.Unlock() |
| pc.updatePreemption(conf) |
| // start at the root: there is only one queue |
| queueConf := conf.Queues[0] |
| root := pc.root |
| // update the root queue |
| if err := root.ApplyConf(queueConf); err != nil { |
| return err |
| } |
| root.UpdateQueueProperties() |
| // update the rest of the queues recursively |
| if err := pc.updateQueues(queueConf.Queues, root); err != nil { |
| return err |
| } |
| // update limit settings: start at the root |
| return ugm.GetUserManager().UpdateConfig(queueConf, conf.Queues[0].Name) |
| } |
| |
| // Process the config structure and create a queue info tree for this partition |
| func (pc *PartitionContext) addQueue(conf []configs.QueueConfig, parent *objects.Queue) error { |
| // create the queue at this level |
| for _, queueConf := range conf { |
| thisQueue, err := objects.NewConfiguredQueue(queueConf, parent) |
| if err != nil { |
| return err |
| } |
| // recursive create the queues below |
| if len(queueConf.Queues) > 0 { |
| err = pc.addQueue(queueConf.Queues, thisQueue) |
| if err != nil { |
| return err |
| } |
| } |
| } |
| return nil |
| } |
| |
| // Update the passed in queues and then do this recursively for the children |
| // |
| // NOTE: this is a lock free call. It should only be called holding the PartitionContext lock. |
| func (pc *PartitionContext) updateQueues(config []configs.QueueConfig, parent *objects.Queue) error { |
| // get the name of the passed in queue |
| parentPath := parent.QueuePath + configs.DOT |
| // keep track of which children we have updated |
| visited := map[string]bool{} |
| // walk over the queues recursively |
| for _, queueConfig := range config { |
| pathName := parentPath + queueConfig.Name |
| queue := pc.getQueueInternal(pathName) |
| var err error |
| if queue == nil { |
| queue, err = objects.NewConfiguredQueue(queueConfig, parent) |
| } else { |
| err = queue.ApplyConf(queueConfig) |
| } |
| if err != nil { |
| return err |
| } |
| // special call to convert to a real policy from the property |
| queue.UpdateQueueProperties() |
| if err = pc.updateQueues(queueConfig.Queues, queue); err != nil { |
| return err |
| } |
| visited[queue.Name] = true |
| } |
| // remove all children that were not visited |
| for childName, childQueue := range parent.GetCopyOfChildren() { |
| if !visited[childName] { |
| childQueue.MarkQueueForRemoval() |
| } |
| } |
| return nil |
| } |
| |
| // Mark the partition for removal from the system. |
| // This can be executed multiple times and is only effective the first time. |
| // The current cleanup sequence is "immediate". This is implemented to allow a graceful cleanup. |
| func (pc *PartitionContext) markPartitionForRemoval() { |
| if err := pc.handlePartitionEvent(objects.Remove); err != nil { |
| log.Log(log.SchedPartition).Error("failed to mark partition for deletion", |
| zap.String("partitionName", pc.Name), |
| zap.Error(err)) |
| } |
| } |
| |
| // Get the state of the partition. |
| // No new nodes and applications will be accepted if stopped or being removed. |
| func (pc *PartitionContext) isDraining() bool { |
| return pc.stateMachine.Current() == objects.Draining.String() |
| } |
| |
| func (pc *PartitionContext) isStopped() bool { |
| return pc.stateMachine.Current() == objects.Stopped.String() |
| } |
| |
| // Handle the state event for the partition. |
| // The state machine handles the locking. |
| func (pc *PartitionContext) handlePartitionEvent(event objects.ObjectEvent) error { |
| err := pc.stateMachine.Event(context.Background(), event.String(), pc.Name) |
| if err == nil { |
| pc.stateTime = time.Now() |
| return nil |
| } |
| // handle the same state transition not nil error (limit of fsm). |
| if err.Error() == "no transition" { |
| return nil |
| } |
| return err |
| } |
| |
| // Get the placement manager. The manager could change when we process the configuration changes |
| // we thus need to lock. |
| func (pc *PartitionContext) getPlacementManager() *placement.AppPlacementManager { |
| pc.RLock() |
| defer pc.RUnlock() |
| return pc.placementManager |
| } |
| |
| // AddApplication adds a new application to the partition. |
| // Runs the placement rules for the queue resolution. Creates a new dynamic queue if the queue does not yet |
| // exists. |
| // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. |
| func (pc *PartitionContext) AddApplication(app *objects.Application) error { |
| if pc.isDraining() || pc.isStopped() { |
| return fmt.Errorf("partition %s is stopped cannot add a new application %s", pc.Name, app.ApplicationID) |
| } |
| |
| // Check if the app exists |
| appID := app.ApplicationID |
| if pc.getApplication(appID) != nil { |
| return fmt.Errorf("adding application %s to partition %s, but application already existed", appID, pc.Name) |
| } |
| |
| // Resolve the queue for this app using the placement rules |
| // We either have an error or a queue name is set on the application. |
| err := pc.getPlacementManager().PlaceApplication(app) |
| if err != nil { |
| return fmt.Errorf("failed to place application %s: %v", appID, err) |
| } |
| queueName := app.GetQueuePath() |
| |
| // lock the partition and make the last change: we need to do this before creating the queues. |
| // queue cleanup might otherwise remove the queue again before we can add the application |
| pc.Lock() |
| defer pc.Unlock() |
| // we have a queue name either from placement or direct, get the queue |
| queue := pc.getQueueInternal(queueName) |
| |
| // create the queue if necessary |
| if queue == nil { |
| if common.IsRecoveryQueue(queueName) { |
| queue, err = pc.createRecoveryQueue() |
| if err != nil { |
| return fmt.Errorf("failed to create recovery queue %s for application %s", common.RecoveryQueueFull, appID) |
| } |
| } else { |
| queue, err = pc.createQueue(queueName, app.GetUser()) |
| if err != nil { |
| return fmt.Errorf("failed to create rule based queue %s for application %s", queueName, appID) |
| } |
| } |
| } |
| |
| // check the queue: is a leaf queue |
| if !queue.IsLeafQueue() { |
| return fmt.Errorf("failed to find queue %s for application %s", queueName, appID) |
| } |
| |
| // set resources based on tags, but only if the queue is dynamic (unmanaged) |
| if queue.IsManaged() { |
| log.Log(log.SchedQueue).Warn("Trying to set resources on a queue that is not an unmanaged leaf", |
| zap.String("queueName", queue.QueuePath)) |
| } else { |
| queue.SetResources(app.GetGuaranteedResource(), app.GetMaxResource()) |
| } |
| // check only for gang request |
| // - make sure the taskgroup request fits in the maximum set for the queue hierarchy |
| // - task groups should only be used in FIFO queues |
| // if the check fails remove the app from the queue again |
| if placeHolder := app.GetPlaceholderAsk(); !resources.IsZero(placeHolder) { |
| // check the queue sorting |
| if !queue.SupportTaskGroup() { |
| return fmt.Errorf("queue %s cannot run application %s with task group request: unsupported sort type", queueName, appID) |
| } |
| if maxQueue := queue.GetMaxQueueSet(); maxQueue != nil { |
| if !maxQueue.FitInMaxUndef(placeHolder) { |
| return fmt.Errorf("queue %s cannot fit application %s: task group request %s larger than max queue allocation %s", queueName, appID, placeHolder.String(), maxQueue.String()) |
| } |
| } |
| } |
| // all is OK update the app and add it to the partition |
| app.SetQueue(queue) |
| app.SetTerminatedCallback(pc.moveTerminatedApp) |
| queue.AddApplication(app) |
| pc.applications[appID] = app |
| |
| return nil |
| } |
| |
| // Remove the application from the partition. |
| // This does not fail and handles missing app/queue/node/allocations internally |
| func (pc *PartitionContext) removeApplication(appID string) []*objects.Allocation { |
| // update the partition details, must be locked but all other updates should not hold partition lock |
| app := pc.removeAppInternal(appID) |
| if app == nil { |
| return nil |
| } |
| // Remove all asks and thus all reservations and pending resources (queue included) |
| _ = app.RemoveAllocationAsk("") |
| // Remove app from queue |
| if queue := app.GetQueue(); queue != nil { |
| queue.RemoveApplication(app) |
| } |
| // Remove all allocations |
| allocations := app.RemoveAllAllocations() |
| // Remove all allocations from node(s) (queues have been updated already) |
| if len(allocations) != 0 { |
| // track the number of allocations |
| pc.updateAllocationCount(-len(allocations)) |
| for _, alloc := range allocations { |
| currentAllocationKey := alloc.GetAllocationKey() |
| node := pc.GetNode(alloc.GetNodeID()) |
| if node == nil { |
| log.Log(log.SchedPartition).Warn("unknown node: not found in active node list", |
| zap.String("appID", appID), |
| zap.String("nodeID", alloc.GetNodeID())) |
| continue |
| } |
| if nodeAlloc := node.RemoveAllocation(currentAllocationKey); nodeAlloc == nil { |
| log.Log(log.SchedPartition).Warn("unknown allocation: not found on the node", |
| zap.String("appID", appID), |
| zap.String("allocationKey", currentAllocationKey), |
| zap.String("nodeID", alloc.GetNodeID())) |
| } |
| } |
| } |
| return allocations |
| } |
| |
| // Locked updates of the partition tracking info |
| func (pc *PartitionContext) removeAppInternal(appID string) *objects.Application { |
| pc.Lock() |
| defer pc.Unlock() |
| |
| // Remove from applications map |
| app := pc.applications[appID] |
| if app == nil { |
| return nil |
| } |
| // remove from partition then cleanup underlying objects |
| delete(pc.applications, appID) |
| return app |
| } |
| |
| func (pc *PartitionContext) GetApplication(appID string) *objects.Application { |
| return pc.getApplication(appID) |
| } |
| |
| func (pc *PartitionContext) getApplication(appID string) *objects.Application { |
| pc.RLock() |
| defer pc.RUnlock() |
| |
| return pc.applications[appID] |
| } |
| |
| func (pc *PartitionContext) getRejectedApplication(appID string) *objects.Application { |
| pc.RLock() |
| defer pc.RUnlock() |
| |
| return pc.rejectedApplications[appID] |
| } |
| |
| // GetQueue returns queue from the structure based on the fully qualified name. |
| // Wrapper around the unlocked version getQueueInternal() |
| // Visible by tests |
| func (pc *PartitionContext) GetQueue(name string) *objects.Queue { |
| pc.RLock() |
| defer pc.RUnlock() |
| return pc.getQueueInternal(name) |
| } |
| |
| // Get the queue from the structure based on the fully qualified name. |
| // The name is not syntax checked and must be valid. |
| // Returns nil if the queue is not found otherwise the queue object. |
| // |
| // NOTE: this is a lock free call. It should only be called holding the PartitionContext lock. |
| func (pc *PartitionContext) getQueueInternal(name string) *objects.Queue { |
| // start at the root |
| queue := pc.root |
| part := strings.Split(strings.ToLower(name), configs.DOT) |
| // no input |
| if len(part) == 0 || part[0] != configs.RootQueue { |
| return nil |
| } |
| // walk over the parts going down towards the requested queue |
| for i := 1; i < len(part); i++ { |
| // if child not found break out and return |
| if queue = queue.GetChildQueue(part[i]); queue == nil { |
| break |
| } |
| } |
| return queue |
| } |
| |
| // Get the queue info for the whole queue structure to pass to the webservice |
| func (pc *PartitionContext) GetPartitionQueues() dao.PartitionQueueDAOInfo { |
| partitionQueueDAOInfo := pc.root.GetPartitionQueueDAOInfo(true) |
| partitionQueueDAOInfo.Partition = common.GetPartitionNameWithoutClusterID(pc.Name) |
| return partitionQueueDAOInfo |
| } |
| |
| // Create the recovery queue. |
| func (pc *PartitionContext) createRecoveryQueue() (*objects.Queue, error) { |
| return objects.NewRecoveryQueue(pc.root) |
| } |
| |
| // Create a queue with full hierarchy. This is called when a new queue is created from a placement rule. |
| // The final leaf queue does not exist otherwise we would not get here. |
| // This means that at least 1 queue (a leaf queue) will be created |
| func (pc *PartitionContext) createQueue(name string, user security.UserGroup) (*objects.Queue, error) { |
| // find the queue furthest down the hierarchy that exists |
| var toCreate []string |
| if !strings.HasPrefix(name, configs.RootQueue) || !strings.Contains(name, configs.DOT) { |
| return nil, fmt.Errorf("illegal queue name passed in: %s", name) |
| } |
| current := name |
| queue := pc.getQueueInternal(current) |
| log.Log(log.SchedPartition).Debug("Checking queue creation") |
| for queue == nil { |
| toCreate = append(toCreate, current[strings.LastIndex(current, configs.DOT)+1:]) |
| current = current[0:strings.LastIndex(current, configs.DOT)] |
| queue = pc.getQueueInternal(current) |
| } |
| // Check the ACL before we really create |
| // The existing parent queue is the lowest we need to look at |
| if !queue.CheckSubmitAccess(user) { |
| return nil, fmt.Errorf("submit access to queue %s denied during create of: %s", current, name) |
| } |
| if queue.IsLeafQueue() { |
| return nil, fmt.Errorf("creation of queue %s failed parent is already a leaf: %s", name, current) |
| } |
| log.Log(log.SchedPartition).Debug("Creating queue(s)", |
| zap.String("parent", current), |
| zap.String("fullPath", name)) |
| for i := len(toCreate) - 1; i >= 0; i-- { |
| // everything is checked and there should be no errors |
| var err error |
| queue, err = objects.NewDynamicQueue(toCreate[i], i == 0, queue) |
| if err != nil { |
| log.Log(log.SchedPartition).Warn("Queue auto create failed unexpected", |
| zap.String("queueName", toCreate[i]), |
| zap.Error(err)) |
| return nil, err |
| } |
| } |
| return queue, nil |
| } |
| |
| // Get a node from the partition by nodeID. |
| func (pc *PartitionContext) GetNode(nodeID string) *objects.Node { |
| return pc.nodes.GetNode(nodeID) |
| } |
| |
| // Add the node to the partition and process the allocations that are reported by the node. |
| // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. |
| func (pc *PartitionContext) AddNode(node *objects.Node, existingAllocations []*objects.Allocation) error { |
| if node == nil { |
| return fmt.Errorf("cannot add 'nil' node to partition %s", pc.Name) |
| } |
| log.Log(log.SchedPartition).Info("adding node to partition", |
| zap.String("partition", pc.Name), |
| zap.String("nodeID", node.NodeID)) |
| if pc.isDraining() || pc.isStopped() { |
| return fmt.Errorf("partition %s is stopped cannot add a new node %s", pc.Name, node.NodeID) |
| } |
| if err := pc.addNodeToList(node); err != nil { |
| return err |
| } |
| |
| // Add allocations that exist on the node when added |
| if len(existingAllocations) > 0 { |
| for current, alloc := range existingAllocations { |
| if err := pc.addAllocation(alloc); err != nil { |
| // not expecting any inflight replacements on node recovery |
| released, _ := pc.removeNode(node.NodeID) |
| log.Log(log.SchedPartition).Info("Failed to add existing allocations, changes reversed", |
| zap.String("nodeID", node.NodeID), |
| zap.Int("existingAllocations", len(existingAllocations)), |
| zap.Int("releasedAllocations", len(released)), |
| zap.Int("processingAlloc", current), |
| zap.Stringer("allocation", alloc), |
| zap.Error(err)) |
| // update failed metric, active metrics are tracked in add/remove from list |
| metrics.GetSchedulerMetrics().IncFailedNodes() |
| return err |
| } |
| } |
| } |
| return nil |
| } |
| |
| // Update the partition resources based on the change of the node information |
| func (pc *PartitionContext) updatePartitionResource(delta *resources.Resource) { |
| pc.Lock() |
| defer pc.Unlock() |
| if delta != nil { |
| if pc.totalPartitionResource == nil { |
| pc.totalPartitionResource = delta.Clone() |
| } else { |
| pc.totalPartitionResource.AddTo(delta) |
| } |
| pc.root.SetMaxResource(pc.totalPartitionResource) |
| } |
| } |
| |
| // Update the partition details when adding a node. |
| func (pc *PartitionContext) addNodeToList(node *objects.Node) error { |
| // we don't grab a lock here because we only update pc.nodes which is internally protected |
| if err := pc.nodes.AddNode(node); err != nil { |
| return fmt.Errorf("failed to add node %s to partition %s, error: %v", node.NodeID, pc.Name, err) |
| } |
| |
| pc.addNodeResources(node) |
| return nil |
| } |
| |
| // Update metrics & resource tracking information. |
| // This locks the partition. The partition may not be locked when we process the allocation |
| // additions to the node as that takes further app, queue or node locks. |
| func (pc *PartitionContext) addNodeResources(node *objects.Node) { |
| pc.Lock() |
| defer pc.Unlock() |
| metrics.GetSchedulerMetrics().IncActiveNodes() |
| // update/set the resources available in the cluster |
| if pc.totalPartitionResource == nil { |
| pc.totalPartitionResource = node.GetCapacity().Clone() |
| } else { |
| pc.totalPartitionResource.AddTo(node.GetCapacity()) |
| } |
| pc.root.SetMaxResource(pc.totalPartitionResource) |
| log.Log(log.SchedPartition).Info("Updated available resources from added node", |
| zap.String("partitionName", pc.Name), |
| zap.String("nodeID", node.NodeID), |
| zap.Stringer("partitionResource", pc.totalPartitionResource)) |
| } |
| |
| // removeNodeFromList removes the node from the list of partition nodes. |
| func (pc *PartitionContext) removeNodeFromList(nodeID string) *objects.Node { |
| node := pc.nodes.RemoveNode(nodeID) |
| if node == nil { |
| log.Log(log.SchedPartition).Debug("node was not found, node already removed", |
| zap.String("nodeID", nodeID), |
| zap.String("partitionName", pc.Name)) |
| return nil |
| } |
| |
| // Remove node from list of tracked nodes |
| metrics.GetSchedulerMetrics().DecActiveNodes() |
| log.Log(log.SchedPartition).Info("Removed node from available partition nodes", |
| zap.String("partitionName", pc.Name), |
| zap.String("nodeID", node.NodeID)) |
| return node |
| } |
| |
| // removeNodeResources updates the partition and root queue resources as part of the node removal process. |
| // This locks the partition. |
| func (pc *PartitionContext) removeNodeResources(node *objects.Node) { |
| pc.Lock() |
| defer pc.Unlock() |
| // cleanup the available resources, partition resources cannot be nil at this point |
| pc.totalPartitionResource.SubFrom(node.GetCapacity()) |
| pc.root.SetMaxResource(pc.totalPartitionResource) |
| log.Log(log.SchedPartition).Info("Updated available resources from removed node", |
| zap.String("partitionName", pc.Name), |
| zap.String("nodeID", node.NodeID), |
| zap.Stringer("partitionResource", pc.totalPartitionResource)) |
| } |
| |
| // removeNode removes a node from the partition. It returns all released and confirmed allocations. |
| // The released allocations are all linked to the current node. |
| // The confirmed allocations are real allocations that are linked to placeholders on the current node and are linked to |
| // other nodes. |
| // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. |
| func (pc *PartitionContext) removeNode(nodeID string) ([]*objects.Allocation, []*objects.Allocation) { |
| log.Log(log.SchedPartition).Info("Removing node from partition", |
| zap.String("partition", pc.Name), |
| zap.String("nodeID", nodeID)) |
| |
| // remove the node: it will no longer be seen by the scheduling cycle |
| node := pc.removeNodeFromList(nodeID) |
| if node == nil { |
| return nil, nil |
| } |
| |
| // unreserve all the apps that were reserved on the node. |
| // The node is not reachable anymore unless you have the pointer. |
| for _, r := range node.GetReservations() { |
| _, app, ask := r.GetObjects() |
| pc.unReserve(app, node, ask) |
| } |
| // cleanup the allocations linked to the node. do this before changing the root queue max: otherwise if |
| // scheduling and removal of a node race on a full cluster we could cause all headroom to disappear for |
| // the time the allocations are not removed. |
| released, confirmed := pc.removeNodeAllocations(node) |
| |
| // update the resource linked to this node, all allocations are removed, queue usage should have decreased |
| pc.removeNodeResources(node) |
| return released, confirmed |
| } |
| |
| // removeNodeAllocations removes all allocations that are assigned to a node as part of the node removal. This is not part |
| // of the node object as updating the applications and queues is the only goal. Applications and queues are not accessible |
| // from the node. The removed and confirmed allocations are returned. |
| // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. |
| func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) ([]*objects.Allocation, []*objects.Allocation) { |
| released := make([]*objects.Allocation, 0) |
| confirmed := make([]*objects.Allocation, 0) |
| // walk over all allocations still registered for this node |
| for _, alloc := range node.GetAllAllocations() { |
| allocationKey := alloc.GetAllocationKey() |
| // since we are not locking the node and or application we could have had an update while processing |
| // note that we do not return the allocation if the app or allocation is not found and assume that it |
| // was already removed |
| app := pc.getApplication(alloc.GetApplicationID()) |
| if app == nil { |
| log.Log(log.SchedPartition).Info("app is not found, skipping while removing the node", |
| zap.String("appID", alloc.GetApplicationID()), |
| zap.String("nodeID", node.NodeID)) |
| continue |
| } |
| // Processing a removal while in the Completing state could race with the state change. |
| // Retrieve the queue early before a possible race. |
| queue := app.GetQueue() |
| // check for an inflight replacement. |
| if alloc.GetReleaseCount() != 0 { |
| release := alloc.GetFirstRelease() |
| // allocation to update the ask on: this needs to happen on the real alloc never the placeholder |
| askAlloc := alloc |
| // placeholder gets handled differently from normal |
| if alloc.IsPlaceholder() { |
| // Check if the real allocation is made on the same node if not we should trigger a confirmation of |
| // the replacement. Trigger the replacement only if it is NOT on the same node. |
| // If it is on the same node we just keep going as the real allocation will be unlinked as a result of |
| // the removal of this placeholder. The ask update will trigger rescheduling later for the real alloc. |
| if alloc.GetNodeID() != release.GetNodeID() { |
| // ignore the return as that is the same as alloc, the alloc is gone after this call |
| _ = app.ReplaceAllocation(allocationKey) |
| // we need to check the resources equality |
| delta := resources.Sub(release.GetAllocatedResource(), alloc.GetAllocatedResource()) |
| // Any negative value in the delta means that at least one of the requested resource in the |
| // placeholder is larger than the real allocation. The nodes are correct the queue needs adjusting. |
| // The reverse case is handled during allocation. |
| if delta.HasNegativeValue() { |
| // this looks incorrect but the delta is negative and the result will be a real decrease |
| err := queue.IncAllocatedResource(delta, false) |
| // this should not happen as we really decrease the value |
| if err != nil { |
| log.Log(log.SchedPartition).Warn("unexpected failure during queue update: replacing placeholder", |
| zap.String("appID", alloc.GetApplicationID()), |
| zap.String("placeholderKey", alloc.GetAllocationKey()), |
| zap.String("allocationKey", release.GetAllocationKey()), |
| zap.Error(err)) |
| } |
| log.Log(log.SchedPartition).Warn("replacing placeholder: placeholder is larger than real allocation", |
| zap.String("allocationKey", release.GetAllocationKey()), |
| zap.Stringer("requested resource", release.GetAllocatedResource()), |
| zap.String("placeholderKey", alloc.GetAllocationKey()), |
| zap.Stringer("placeholder resource", alloc.GetAllocatedResource())) |
| } |
| // track what we confirm on the other node to confirm it in the shim and get is bound |
| confirmed = append(confirmed, release) |
| // the allocation is removed so add it to the list that we return |
| released = append(released, alloc) |
| log.Log(log.SchedPartition).Info("allocation removed from node and replacement confirmed", |
| zap.String("nodeID", node.NodeID), |
| zap.String("allocationKey", allocationKey), |
| zap.String("replacement nodeID", release.GetNodeID()), |
| zap.String("replacement allocationKey", release.GetAllocationKey())) |
| continue |
| } |
| askAlloc = release |
| } |
| // unlink the placeholder and allocation |
| release.ClearReleases() |
| alloc.ClearReleases() |
| // mark ask as unallocated to get it re-scheduled |
| _, err := app.DeallocateAsk(askAlloc.GetAsk().GetAllocationKey()) |
| if err == nil { |
| log.Log(log.SchedPartition).Info("inflight placeholder replacement reversed due to node removal", |
| zap.String("appID", askAlloc.GetApplicationID()), |
| zap.String("allocationKey", askAlloc.GetAsk().GetAllocationKey()), |
| zap.String("nodeID", node.NodeID), |
| zap.String("replacement allocationKey", askAlloc.GetAllocationKey())) |
| } else { |
| log.Log(log.SchedPartition).Error("node removal: repeat update failure for inflight replacement", |
| zap.String("appID", askAlloc.GetApplicationID()), |
| zap.String("allocationKey", askAlloc.GetAsk().GetAllocationKey()), |
| zap.String("nodeID", node.NodeID), |
| zap.Error(err)) |
| } |
| } |
| // check allocations on the app |
| if app.RemoveAllocation(allocationKey, si.TerminationType_UNKNOWN_TERMINATION_TYPE) == nil { |
| log.Log(log.SchedPartition).Info("allocation is not found, skipping while removing the node", |
| zap.String("allocationKey", allocationKey), |
| zap.String("appID", app.ApplicationID), |
| zap.String("nodeID", node.NodeID)) |
| continue |
| } |
| if err := queue.DecAllocatedResource(alloc.GetAllocatedResource()); err != nil { |
| log.Log(log.SchedPartition).Warn("failed to release resources from queue", |
| zap.String("appID", alloc.GetApplicationID()), |
| zap.Error(err)) |
| } |
| // remove preempted resources |
| if alloc.IsPreempted() { |
| queue.DecPreemptingResource(alloc.GetAllocatedResource()) |
| } |
| if alloc.IsPlaceholder() { |
| pc.decPhAllocationCount(1) |
| } |
| |
| // the allocation is removed so add it to the list that we return |
| released = append(released, alloc) |
| metrics.GetQueueMetrics(queue.GetQueuePath()).IncReleasedContainer() |
| log.Log(log.SchedPartition).Info("node removal: allocation removed", |
| zap.String("nodeID", node.NodeID), |
| zap.String("queueName", queue.GetQueuePath()), |
| zap.String("appID", app.ApplicationID), |
| zap.Stringer("allocation", alloc)) |
| } |
| // track the number of allocations: decrement the released allocation AND increment with the confirmed |
| pc.updateAllocationCount(len(confirmed) - len(released)) |
| return released, confirmed |
| } |
| |
| func (pc *PartitionContext) calculateOutstandingRequests() []*objects.AllocationAsk { |
| if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) { |
| return nil |
| } |
| outstanding := make([]*objects.AllocationAsk, 0) |
| pc.root.GetQueueOutstandingRequests(&outstanding) |
| return outstanding |
| } |
| |
| // Try regular allocation for the partition |
| // Lock free call this all locks are taken when needed in called functions |
| func (pc *PartitionContext) tryAllocate() *objects.Allocation { |
| if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) { |
| // nothing to do just return |
| return nil |
| } |
| // try allocating from the root down |
| alloc := pc.root.TryAllocate(pc.GetNodeIterator, pc.GetFullNodeIterator, pc.GetNode, pc.isPreemptionEnabled()) |
| if alloc != nil { |
| return pc.allocate(alloc) |
| } |
| return nil |
| } |
| |
| // Try process reservations for the partition |
| // Lock free call this all locks are taken when needed in called functions |
| func (pc *PartitionContext) tryReservedAllocate() *objects.Allocation { |
| if pc.getReservationCount() == 0 { |
| return nil |
| } |
| if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) { |
| // nothing to do just return |
| return nil |
| } |
| // try allocating from the root down |
| alloc := pc.root.TryReservedAllocate(pc.GetNodeIterator) |
| if alloc != nil { |
| return pc.allocate(alloc) |
| } |
| return nil |
| } |
| |
| // Try process placeholder for the partition |
| // Lock free call this all locks are taken when needed in called functions |
| func (pc *PartitionContext) tryPlaceholderAllocate() *objects.Allocation { |
| if pc.getPhAllocationCount() == 0 { |
| return nil |
| } |
| if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) { |
| // nothing to do just return |
| return nil |
| } |
| // try allocating from the root down |
| alloc := pc.root.TryPlaceholderAllocate(pc.GetNodeIterator, pc.GetNode) |
| if alloc != nil { |
| log.Log(log.SchedPartition).Info("scheduler replace placeholder processed", |
| zap.String("appID", alloc.GetApplicationID()), |
| zap.String("allocationKey", alloc.GetAllocationKey()), |
| zap.String("placeholder released allocationKey", alloc.GetFirstRelease().GetAllocationKey())) |
| // pass the release back to the RM via the cluster context |
| return alloc |
| } |
| return nil |
| } |
| |
| // Process the allocation and make the left over changes in the partition. |
| // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. |
| func (pc *PartitionContext) allocate(alloc *objects.Allocation) *objects.Allocation { |
| // find the app make sure it still exists |
| appID := alloc.GetApplicationID() |
| app := pc.getApplication(appID) |
| if app == nil { |
| log.Log(log.SchedPartition).Info("Application was removed while allocating", |
| zap.String("appID", appID)) |
| return nil |
| } |
| // find the node make sure it still exists |
| // if the node was passed in use that ID instead of the one from the allocation |
| // the node ID is set when a reservation is allocated on a non-reserved node |
| var nodeID string |
| if alloc.GetReservedNodeID() == "" { |
| nodeID = alloc.GetNodeID() |
| } else { |
| nodeID = alloc.GetReservedNodeID() |
| log.Log(log.SchedPartition).Debug("Reservation allocated on different node", |
| zap.String("current node", alloc.GetNodeID()), |
| zap.String("reserved node", nodeID), |
| zap.String("appID", appID)) |
| } |
| node := pc.GetNode(nodeID) |
| if node == nil { |
| log.Log(log.SchedPartition).Info("Node was removed while allocating", |
| zap.String("nodeID", nodeID), |
| zap.String("appID", appID)) |
| return nil |
| } |
| alloc.SetInstanceType(node.GetInstanceType()) |
| // reservation |
| if alloc.GetResult() == objects.Reserved { |
| pc.reserve(app, node, alloc.GetAsk()) |
| return nil |
| } |
| // unreserve |
| if alloc.GetResult() == objects.Unreserved || alloc.GetResult() == objects.AllocatedReserved { |
| pc.unReserve(app, node, alloc.GetAsk()) |
| if alloc.GetResult() == objects.Unreserved { |
| return nil |
| } |
| // remove the link to the reserved node |
| alloc.SetReservedNodeID("") |
| } |
| |
| // track the number of allocations |
| pc.updateAllocationCount(1) |
| if alloc.IsPlaceholder() { |
| pc.incPhAllocationCount() |
| } |
| |
| log.Log(log.SchedPartition).Info("scheduler allocation processed", |
| zap.String("appID", alloc.GetApplicationID()), |
| zap.String("allocationKey", alloc.GetAllocationKey()), |
| zap.Stringer("allocatedResource", alloc.GetAllocatedResource()), |
| zap.Bool("placeholder", alloc.IsPlaceholder()), |
| zap.String("targetNode", alloc.GetNodeID())) |
| // pass the allocation back to the RM via the cluster context |
| return alloc |
| } |
| |
| // Process the reservation in the scheduler |
| // Lock free call this must be called holding the context lock |
| func (pc *PartitionContext) reserve(app *objects.Application, node *objects.Node, ask *objects.AllocationAsk) { |
| appID := app.ApplicationID |
| // app has node already reserved cannot reserve again |
| if app.IsReservedOnNode(node.NodeID) { |
| log.Log(log.SchedPartition).Info("Application is already reserved on node", |
| zap.String("appID", appID), |
| zap.String("nodeID", node.NodeID)) |
| return |
| } |
| // all ok, add the reservation to the app, this will also reserve the node |
| if err := app.Reserve(node, ask); err != nil { |
| log.Log(log.SchedPartition).Debug("Failed to handle reservation, error during update of app", |
| zap.Error(err)) |
| return |
| } |
| |
| // add the reservation to the queue list |
| app.GetQueue().Reserve(appID) |
| pc.incReservationCount() |
| |
| log.Log(log.SchedPartition).Info("allocation ask is reserved", |
| zap.String("appID", appID), |
| zap.String("queue", app.GetQueuePath()), |
| zap.String("allocationKey", ask.GetAllocationKey()), |
| zap.String("node", node.NodeID)) |
| } |
| |
| // unReserve removes the reservation from the objects in the scheduler |
| // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. |
| func (pc *PartitionContext) unReserve(app *objects.Application, node *objects.Node, ask *objects.AllocationAsk) { |
| // remove the reservation of the app, this will also unReserve the node |
| var err error |
| var num int |
| if num, err = app.UnReserve(node, ask); err != nil { |
| log.Log(log.SchedPartition).Info("Failed to unreserve, error during allocate on the app", |
| zap.Error(err)) |
| return |
| } |
| // remove the reservation of the queue |
| appID := app.ApplicationID |
| app.GetQueue().UnReserve(appID, num) |
| pc.decReservationCount(num) |
| |
| log.Log(log.SchedPartition).Info("allocation ask is unreserved", |
| zap.String("appID", appID), |
| zap.String("queue", app.GetQueuePath()), |
| zap.String("allocationKey", ask.GetAllocationKey()), |
| zap.String("node", node.NodeID), |
| zap.Int("reservationsRemoved", num)) |
| } |
| |
| // Create an ordered node iterator based on the node sort policy set for this partition. |
| // The iterator is nil if there are no unreserved nodes available. |
| func (pc *PartitionContext) GetNodeIterator() objects.NodeIterator { |
| return pc.nodes.GetNodeIterator() |
| } |
| |
| // Create an ordered node iterator based on the node sort policy set for this partition. |
| // The iterator is nil if there are no nodes available. |
| func (pc *PartitionContext) GetFullNodeIterator() objects.NodeIterator { |
| return pc.nodes.GetFullNodeIterator() |
| } |
| |
| // Updated the allocations counter for the partition |
| func (pc *PartitionContext) updateAllocationCount(allocs int) { |
| pc.Lock() |
| defer pc.Unlock() |
| pc.allocations += allocs |
| } |
| |
| func (pc *PartitionContext) GetTotalPartitionResource() *resources.Resource { |
| pc.RLock() |
| defer pc.RUnlock() |
| |
| return pc.totalPartitionResource.Clone() |
| } |
| |
| func (pc *PartitionContext) GetAllocatedResource() *resources.Resource { |
| pc.RLock() |
| defer pc.RUnlock() |
| |
| return pc.root.GetAllocatedResource() |
| } |
| |
| func (pc *PartitionContext) GetTotalAllocationCount() int { |
| pc.RLock() |
| defer pc.RUnlock() |
| return pc.allocations |
| } |
| |
| func (pc *PartitionContext) GetTotalNodeCount() int { |
| return pc.nodes.GetNodeCount() |
| } |
| |
| // GetApplications returns a slice of the current applications tracked by the partition. |
| func (pc *PartitionContext) GetApplications() []*objects.Application { |
| pc.RLock() |
| defer pc.RUnlock() |
| var appList []*objects.Application |
| for _, app := range pc.applications { |
| appList = append(appList, app) |
| } |
| return appList |
| } |
| |
| // GetCompletedApplications returns a slice of the completed applications tracked by the partition. |
| func (pc *PartitionContext) GetCompletedApplications() []*objects.Application { |
| pc.RLock() |
| defer pc.RUnlock() |
| var appList []*objects.Application |
| for _, app := range pc.completedApplications { |
| appList = append(appList, app) |
| } |
| return appList |
| } |
| |
| // GetRejectedApplications returns a slice of the rejected applications tracked by the partition. |
| func (pc *PartitionContext) GetRejectedApplications() []*objects.Application { |
| pc.RLock() |
| defer pc.RUnlock() |
| var appList []*objects.Application |
| for _, app := range pc.rejectedApplications { |
| appList = append(appList, app) |
| } |
| return appList |
| } |
| |
| func (pc *PartitionContext) getAppsState(appMap map[string]*objects.Application, state string) []string { |
| pc.RLock() |
| defer pc.RUnlock() |
| apps := []string{} |
| for appID, app := range appMap { |
| if app.CurrentState() == state { |
| apps = append(apps, appID) |
| } |
| } |
| return apps |
| } |
| |
| // getAppsByState returns a slice of applicationIDs for the current applications filtered by state |
| // Completed and Rejected applications are tracked in a separate map and will never be included. |
| func (pc *PartitionContext) getAppsByState(state string) []string { |
| return pc.getAppsState(pc.applications, state) |
| } |
| |
| // getRejectedAppsByState returns a slice of applicationIDs for the rejected applications filtered by state. |
| func (pc *PartitionContext) getRejectedAppsByState(state string) []string { |
| return pc.getAppsState(pc.rejectedApplications, state) |
| } |
| |
| // getCompletedAppsByState returns a slice of applicationIDs for the completed applicationIDs filtered by state. |
| func (pc *PartitionContext) getCompletedAppsByState(state string) []string { |
| return pc.getAppsState(pc.completedApplications, state) |
| } |
| |
| // cleanupExpiredApps cleans up applications in the Expired state from the three tracking maps |
| func (pc *PartitionContext) cleanupExpiredApps() { |
| for _, appID := range pc.getAppsByState(objects.Expired.String()) { |
| pc.Lock() |
| delete(pc.applications, appID) |
| pc.Unlock() |
| } |
| for _, appID := range pc.getRejectedAppsByState(objects.Expired.String()) { |
| pc.Lock() |
| delete(pc.rejectedApplications, appID) |
| pc.Unlock() |
| } |
| for _, appID := range pc.getCompletedAppsByState(objects.Expired.String()) { |
| pc.Lock() |
| delete(pc.completedApplications, appID) |
| pc.Unlock() |
| } |
| } |
| |
| // GetNodes returns a slice of all nodes unfiltered from the iterator |
| func (pc *PartitionContext) GetNodes() []*objects.Node { |
| return pc.nodes.GetNodes() |
| } |
| |
| // Add an allocation to the partition/node/application/queue during node registration. |
| // Queue max allocation is not checked as the allocation is part of a new node addition. |
| // |
| // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. |
| func (pc *PartitionContext) addAllocation(alloc *objects.Allocation) error { |
| // cannot do anything with a nil alloc, should only happen if the shim broke things badly |
| if alloc == nil { |
| return nil |
| } |
| if pc.isStopped() { |
| return fmt.Errorf("partition %s is stopped cannot add new allocation %s", pc.Name, alloc.GetAllocationKey()) |
| } |
| |
| log.Log(log.SchedPartition).Info("adding recovered allocation", |
| zap.String("partitionName", pc.Name), |
| zap.String("appID", alloc.GetApplicationID()), |
| zap.String("allocationKey", alloc.GetAllocationKey())) |
| |
| // Check if allocation violates any resource restriction, or allocate on a |
| // non-existent application or nodes. |
| node := pc.GetNode(alloc.GetNodeID()) |
| if node == nil { |
| metrics.GetSchedulerMetrics().IncSchedulingError() |
| return fmt.Errorf("failed to find node %s", alloc.GetNodeID()) |
| } |
| |
| app := pc.getApplication(alloc.GetApplicationID()) |
| if app == nil { |
| metrics.GetSchedulerMetrics().IncSchedulingError() |
| return fmt.Errorf("failed to find application %s", alloc.GetApplicationID()) |
| } |
| queue := app.GetQueue() |
| |
| // Do not check if the new allocation goes beyond the queue's max resource (recursive). |
| // still handle a returned error but they should never happen. |
| if err := queue.IncAllocatedResource(alloc.GetAllocatedResource(), true); err != nil { |
| metrics.GetSchedulerMetrics().IncSchedulingError() |
| return fmt.Errorf("cannot allocate resource from application %s: %v ", |
| alloc.GetApplicationID(), err) |
| } |
| |
| metrics.GetQueueMetrics(queue.GetQueuePath()).IncAllocatedContainer() |
| node.AddAllocation(alloc) |
| alloc.SetInstanceType(node.GetInstanceType()) |
| app.RecoverAllocationAsk(alloc.GetAsk()) |
| app.AddAllocation(alloc) |
| |
| // track the number of allocations |
| pc.updateAllocationCount(1) |
| if alloc.IsPlaceholder() { |
| pc.incPhAllocationCount() |
| } |
| |
| log.Log(log.SchedPartition).Info("recovered allocation", |
| zap.String("partitionName", pc.Name), |
| zap.String("appID", alloc.GetApplicationID()), |
| zap.String("allocationKey", alloc.GetAllocationKey()), |
| zap.Bool("placeholder", alloc.IsPlaceholder())) |
| return nil |
| } |
| |
| func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation, forced bool) (security.UserGroup, error) { |
| pc.RLock() |
| defer pc.RUnlock() |
| return pc.userGroupCache.ConvertUGI(ugi, forced) |
| } |
| |
| // calculate overall nodes resource usage and returns a map as the result, |
| // where the key is the resource name, e.g memory, and the value is a []int, |
| // which is a slice with 10 elements, |
| // each element represents a range of resource usage, |
| // such as |
| // |
| // 0: 0%->10% |
| // 1: 10% -> 20% |
| // ... |
| // 9: 90% -> 100% |
| // |
| // the element value represents number of nodes fall into this bucket. |
| // if slice[9] = 3, this means there are 3 nodes resource usage is in the range 80% to 90%. |
| // |
| // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. |
| func (pc *PartitionContext) calculateNodesResourceUsage() map[string][]int { |
| nodesCopy := pc.GetNodes() |
| mapResult := make(map[string][]int) |
| for _, node := range nodesCopy { |
| capacity := node.GetCapacity() |
| allocated := node.GetAllocatedResource() |
| for name, total := range capacity.Resources { |
| if total > 0 { |
| resourceAllocated := float64(allocated.Resources[name]) |
| // Consider over-allocated node as 100% utilized. |
| v := math.Min(resourceAllocated/float64(total), 1) |
| idx := int(math.Dim(math.Ceil(v*10), 1)) |
| if dist, ok := mapResult[name]; !ok { |
| newDist := make([]int, 10) |
| for i := range newDist { |
| newDist[i] = 0 |
| } |
| mapResult[name] = newDist |
| mapResult[name][idx]++ |
| } else { |
| dist[idx]++ |
| } |
| } |
| } |
| } |
| return mapResult |
| } |
| |
| // removeAllocation removes the referenced allocation(s) from the applications and nodes |
| // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. |
| func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*objects.Allocation, *objects.Allocation) { |
| if release == nil { |
| return nil, nil |
| } |
| appID := release.ApplicationID |
| allocationKey := release.GetAllocationKey() |
| app := pc.getApplication(appID) |
| // no app nothing to do everything should already be clean |
| if app == nil { |
| log.Log(log.SchedPartition).Info("Application not found while releasing allocation", |
| zap.String("appID", appID), |
| zap.String("allocationKey", allocationKey), |
| zap.Stringer("terminationType", release.TerminationType)) |
| return nil, nil |
| } |
| // Processing a removal while in the Completing state could race with the state change. |
| // The race occurs between removing the allocation and updating the queue after node processing. |
| // If the state change removes the queue link before we get to updating the queue after the node we |
| // leave the resources as allocated on the queue. The queue cannot be removed yet at this point as |
| // there are still allocations left. So retrieve the queue early to sidestep the race. |
| queue := app.GetQueue() |
| // temp store for allocations manipulated |
| released := make([]*objects.Allocation, 0) |
| var confirmed *objects.Allocation |
| // when allocationKey is not specified, remove all allocations from the app |
| if allocationKey == "" { |
| log.Log(log.SchedPartition).Info("remove all allocations", |
| zap.String("appID", appID)) |
| released = append(released, app.RemoveAllAllocations()...) |
| } else { |
| // if we have an allocationKey the termination type is important |
| if release.TerminationType == si.TerminationType_PLACEHOLDER_REPLACED { |
| log.Log(log.SchedPartition).Info("replacing placeholder allocation", |
| zap.String("appID", appID), |
| zap.String("allocationKey", allocationKey)) |
| if alloc := app.ReplaceAllocation(allocationKey); alloc != nil { |
| released = append(released, alloc) |
| } |
| } else { |
| log.Log(log.SchedPartition).Info("removing allocation from application", |
| zap.String("appID", appID), |
| zap.String("allocationKey", allocationKey), |
| zap.Stringer("terminationType", release.TerminationType)) |
| if alloc := app.RemoveAllocation(allocationKey, release.TerminationType); alloc != nil { |
| released = append(released, alloc) |
| } |
| } |
| } |
| |
| // all releases are collected: placeholder count needs updating for all placeholder releases |
| // regardless of what happens later |
| phReleases := 0 |
| for _, r := range released { |
| if r.IsPlaceholder() { |
| phReleases++ |
| } |
| } |
| if phReleases > 0 { |
| pc.decPhAllocationCount(phReleases) |
| } |
| |
| // for each allocation to release, update the node and queue. |
| total := resources.NewResource() |
| totalPreempting := resources.NewResource() |
| for _, alloc := range released { |
| node := pc.GetNode(alloc.GetNodeID()) |
| if node == nil { |
| log.Log(log.SchedPartition).Warn("node not found while releasing allocation", |
| zap.String("appID", appID), |
| zap.String("allocationKey", alloc.GetAllocationKey()), |
| zap.String("nodeID", alloc.GetNodeID())) |
| continue |
| } |
| if release.TerminationType == si.TerminationType_PLACEHOLDER_REPLACED { |
| confirmed = alloc.GetFirstRelease() |
| // we need to check the resources equality |
| delta := resources.Sub(confirmed.GetAllocatedResource(), alloc.GetAllocatedResource()) |
| // Any negative value in the delta means that at least one of the requested resource in the |
| // placeholder is larger than the real allocation. The node and queue need adjusting. |
| // The reverse case is handled during allocation. |
| if delta.HasNegativeValue() { |
| // This looks incorrect but the delta is negative and the result will be an increase of the |
| // total tracked. The total will later be deducted from the queue usage. |
| total.SubFrom(delta) |
| log.Log(log.SchedPartition).Warn("replacing placeholder: placeholder is larger than real allocation", |
| zap.String("allocationKey", confirmed.GetAllocationKey()), |
| zap.Stringer("requested resource", confirmed.GetAllocatedResource()), |
| zap.String("placeholderKey", alloc.GetAllocationKey()), |
| zap.Stringer("placeholder resource", alloc.GetAllocatedResource())) |
| } |
| // replacements could be on a different node and different size handle all cases |
| if confirmed.GetNodeID() == alloc.GetNodeID() { |
| // this is the real swap on the node, adjust usage if needed |
| node.ReplaceAllocation(alloc.GetAllocationKey(), confirmed, delta) |
| } else { |
| // we have already added the real allocation to the new node, just remove the placeholder |
| node.RemoveAllocation(alloc.GetAllocationKey()) |
| } |
| log.Log(log.SchedPartition).Info("replacing placeholder allocation on node", |
| zap.String("nodeID", alloc.GetNodeID()), |
| zap.String("allocationKey", alloc.GetAllocationKey()), |
| zap.String("allocation nodeID", confirmed.GetNodeID())) |
| } else if node.RemoveAllocation(alloc.GetAllocationKey()) != nil { |
| // all non replacement are real removes: must update the queue usage |
| total.AddTo(alloc.GetAllocatedResource()) |
| log.Log(log.SchedPartition).Info("removing allocation from node", |
| zap.String("nodeID", alloc.GetNodeID()), |
| zap.String("allocationKey", alloc.GetAllocationKey())) |
| } |
| if alloc.IsPreempted() { |
| totalPreempting.AddTo(alloc.GetAllocatedResource()) |
| } |
| } |
| if resources.StrictlyGreaterThanZero(total) { |
| if err := queue.DecAllocatedResource(total); err != nil { |
| log.Log(log.SchedPartition).Warn("failed to release resources from queue", |
| zap.String("appID", appID), |
| zap.String("allocationKey", allocationKey), |
| zap.Error(err)) |
| } |
| } |
| if resources.StrictlyGreaterThanZero(totalPreempting) { |
| queue.DecPreemptingResource(totalPreempting) |
| } |
| |
| // if confirmed is set we can assume there will just be one alloc in the released |
| // that allocation was already released by the shim, so clean up released |
| if confirmed != nil { |
| released = nil |
| } |
| // track the number of allocations, when we replace the result is no change |
| if allocReleases := len(released); allocReleases > 0 { |
| pc.updateAllocationCount(-allocReleases) |
| metrics.GetQueueMetrics(queue.GetQueuePath()).AddReleasedContainers(allocReleases) |
| } |
| |
| // if the termination type is TIMEOUT/PREEMPTED_BY_SCHEDULER, we don't notify the shim, |
| // because the release that is processed now is a confirmation returned by the shim to the core |
| if release.TerminationType == si.TerminationType_TIMEOUT || release.TerminationType == si.TerminationType_PREEMPTED_BY_SCHEDULER { |
| released = nil |
| } |
| return released, confirmed |
| } |
| |
| // Remove the allocation ask from the specified application |
| // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. |
| func (pc *PartitionContext) removeAllocationAsk(release *si.AllocationAskRelease) { |
| if release == nil { |
| return |
| } |
| appID := release.ApplicationID |
| allocKey := release.AllocationKey |
| // A timeout termination is send by the core to the shim, ignore on return. |
| if release.TerminationType == si.TerminationType_TIMEOUT { |
| log.Log(log.SchedPartition).Debug("Ignoring ask release with termination type Timeout", |
| zap.String("appID", appID), |
| zap.String("ask", allocKey)) |
| return |
| } |
| app := pc.getApplication(appID) |
| if app == nil { |
| log.Log(log.SchedPartition).Info("Invalid ask release requested by shim", |
| zap.String("appID", appID), |
| zap.String("ask", allocKey), |
| zap.Stringer("terminationType", release.TerminationType)) |
| return |
| } |
| // remove the allocation asks from the app |
| _ = app.RemoveAllocationAsk(allocKey) |
| } |
| |
| // Add the allocation ask to the specified application |
| // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. |
| func (pc *PartitionContext) addAllocationAsk(siAsk *si.AllocationAsk) error { |
| if siAsk == nil { |
| return nil |
| } |
| app := pc.getApplication(siAsk.ApplicationID) |
| if app == nil { |
| return fmt.Errorf("failed to find application %s, for allocation ask %s", siAsk.ApplicationID, siAsk.AllocationKey) |
| } |
| // add the allocation asks to the app |
| return app.AddAllocationAsk(objects.NewAllocationAskFromSI(siAsk)) |
| } |
| |
| func (pc *PartitionContext) GetCurrentState() string { |
| return pc.stateMachine.Current() |
| } |
| |
| func (pc *PartitionContext) GetStateTime() time.Time { |
| pc.RLock() |
| defer pc.RUnlock() |
| return pc.stateTime |
| } |
| |
| func (pc *PartitionContext) GetNodeSortingPolicyType() policies.SortingPolicy { |
| policy := pc.nodes.GetNodeSortingPolicy() |
| return policy.PolicyType() |
| } |
| |
| func (pc *PartitionContext) GetNodeSortingResourceWeights() map[string]float64 { |
| policy := pc.nodes.GetNodeSortingPolicy() |
| return policy.ResourceWeights() |
| } |
| |
| func (pc *PartitionContext) isPreemptionEnabled() bool { |
| pc.RLock() |
| defer pc.RUnlock() |
| return pc.preemptionEnabled |
| } |
| |
| func (pc *PartitionContext) moveTerminatedApp(appID string) { |
| app := pc.getApplication(appID) |
| // nothing to do if the app is not found on the partition |
| if app == nil { |
| log.Log(log.SchedPartition).Debug("Application already removed from app list", |
| zap.String("appID", appID)) |
| return |
| } |
| app.UnSetQueue() |
| // new ID as completedApplications map key, use negative value to get a divider |
| newID := appID + strconv.FormatInt(-(time.Now()).Unix(), 10) |
| log.Log(log.SchedPartition).Info("Removing terminated application from the application list", |
| zap.String("appID", appID), |
| zap.String("app status", app.CurrentState())) |
| app.LogAppSummary(pc.RmID) |
| app.CleanupTrackedResource() |
| pc.Lock() |
| defer pc.Unlock() |
| delete(pc.applications, appID) |
| pc.completedApplications[newID] = app |
| } |
| |
| func (pc *PartitionContext) AddRejectedApplication(rejectedApplication *objects.Application, rejectedMessage string) { |
| if err := rejectedApplication.RejectApplication(rejectedMessage); err != nil { |
| log.Log(log.SchedPartition).Warn("BUG: Unexpected failure: Application state not changed to Rejected", |
| zap.String("currentState", rejectedApplication.CurrentState()), |
| zap.Error(err)) |
| } |
| if pc.rejectedApplications == nil { |
| pc.rejectedApplications = make(map[string]*objects.Application) |
| } |
| pc.rejectedApplications[rejectedApplication.ApplicationID] = rejectedApplication |
| } |
| |
| func (pc *PartitionContext) incPhAllocationCount() { |
| pc.Lock() |
| defer pc.Unlock() |
| pc.placeholderAllocations++ |
| } |
| |
| func (pc *PartitionContext) decPhAllocationCount(num int) { |
| pc.Lock() |
| defer pc.Unlock() |
| pc.placeholderAllocations -= num |
| } |
| |
| func (pc *PartitionContext) getPhAllocationCount() int { |
| pc.RLock() |
| defer pc.RUnlock() |
| return pc.placeholderAllocations |
| } |
| |
| func (pc *PartitionContext) incReservationCount() { |
| pc.Lock() |
| defer pc.Unlock() |
| pc.reservations++ |
| } |
| |
| func (pc *PartitionContext) decReservationCount(num int) { |
| pc.Lock() |
| defer pc.Unlock() |
| pc.reservations -= num |
| } |
| |
| func (pc *PartitionContext) getReservationCount() int { |
| pc.RLock() |
| defer pc.RUnlock() |
| return pc.reservations |
| } |