blob: dd4c1f99e144c6b62f1f2c2292b284d88aa25097 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package objects
import (
"fmt"
"strconv"
"time"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
type AllocationResult int
const (
None AllocationResult = iota
Allocated
AllocatedReserved
Reserved
Unreserved
Replaced
)
func (ar AllocationResult) String() string {
return [...]string{"None", "Allocated", "AllocatedReserved", "Reserved", "Unreserved", "Replaced"}[ar]
}
type Allocation struct {
// Read-only fields
ask *AllocationAsk
allocationKey string
applicationID string
partitionName string
taskGroupName string // task group this allocation belongs to
placeholder bool // is this a placeholder allocation
nodeID string
priority int32
tags map[string]string
allocatedResource *resources.Resource
// Mutable fields which need protection
placeholderUsed bool
createTime time.Time // the time this allocation was created
bindTime time.Time // the time this allocation was bound to a node
placeholderCreateTime time.Time
released bool
reservedNodeID string
result AllocationResult
releases []*Allocation
preempted bool
instType string
locking.RWMutex
}
func NewAllocation(nodeID string, ask *AllocationAsk) *Allocation {
var createTime time.Time
if ask.GetTag(siCommon.CreationTime) == "" {
createTime = time.Now()
} else {
createTime = ask.GetCreateTime()
}
return &Allocation{
ask: ask,
allocationKey: ask.GetAllocationKey(),
applicationID: ask.GetApplicationID(),
createTime: createTime,
bindTime: time.Now(),
nodeID: nodeID,
partitionName: common.GetPartitionNameWithoutClusterID(ask.GetPartitionName()),
tags: ask.GetTagsClone(),
priority: ask.GetPriority(),
allocatedResource: ask.GetAllocatedResource().Clone(),
taskGroupName: ask.GetTaskGroup(),
placeholder: ask.IsPlaceholder(),
result: Allocated,
}
}
func newReservedAllocation(nodeID string, ask *AllocationAsk) *Allocation {
alloc := NewAllocation(nodeID, ask)
alloc.SetBindTime(time.Time{})
alloc.SetResult(Reserved)
return alloc
}
func newUnreservedAllocation(nodeID string, ask *AllocationAsk) *Allocation {
alloc := NewAllocation(nodeID, ask)
alloc.SetBindTime(time.Time{})
alloc.SetResult(Unreserved)
return alloc
}
// Create a new Allocation from a node recovered allocation.
// Also creates an AllocationAsk to maintain backward compatible behaviour
// This returns a nil Allocation on nil input or errors
func NewAllocationFromSI(alloc *si.Allocation) *Allocation {
if alloc == nil {
return nil
}
// this is a safety check placeholder and task group name must be set as a combo
// order is important as task group can be set without placeholder but not the other way around
if alloc.Placeholder && alloc.TaskGroupName == "" {
log.Log(log.SchedAllocation).Debug("Allocation cannot be a placeholder without a TaskGroupName",
zap.Stringer("SI alloc", alloc))
return nil
}
creationTime, err := strconv.ParseInt(alloc.AllocationTags[siCommon.CreationTime], 10, 64)
if err != nil {
log.Log(log.SchedAllocation).Warn("CreationTime is not set on the Allocation object or invalid",
zap.String("creationTime", alloc.AllocationTags[siCommon.CreationTime]))
creationTime = -1
}
ask := &AllocationAsk{
allocationKey: alloc.AllocationKey,
applicationID: alloc.ApplicationID,
partitionName: alloc.PartitionName,
allocatedResource: resources.NewResourceFromProto(alloc.ResourcePerAlloc),
tags: CloneAllocationTags(alloc.AllocationTags),
priority: alloc.Priority,
allocated: true,
taskGroupName: alloc.TaskGroupName,
placeholder: alloc.Placeholder,
createTime: time.Unix(creationTime, 0),
allocLog: make(map[string]*AllocationLogEntry),
originator: alloc.Originator,
allowPreemptSelf: alloc.PreemptionPolicy.GetAllowPreemptSelf(),
allowPreemptOther: alloc.PreemptionPolicy.GetAllowPreemptOther(),
}
newAlloc := NewAllocation(alloc.NodeID, ask)
newAlloc.allocationKey = alloc.AllocationKey
return newAlloc
}
// Convert the Allocation into a SI object. This is a limited set of values that gets copied into the SI.
// We only use this to communicate *back* to the RM. All other fields are considered incoming fields from
// the RM into the core.
// The limited set of fields link the Allocation to an Application, Node and AllocationAsk.
func (a *Allocation) NewSIFromAllocation() *si.Allocation {
if a == nil {
return nil
}
return &si.Allocation{
NodeID: a.GetNodeID(),
ApplicationID: a.GetApplicationID(),
AllocationKey: a.GetAllocationKey(),
ResourcePerAlloc: a.GetAllocatedResource().ToProto(), // needed in tests for restore
TaskGroupName: a.GetTaskGroup(),
Placeholder: a.IsPlaceholder(),
Originator: a.GetAsk().IsOriginator(),
PreemptionPolicy: &si.PreemptionPolicy{
AllowPreemptSelf: a.GetAsk().IsAllowPreemptSelf(),
AllowPreemptOther: a.GetAsk().IsAllowPreemptOther(),
},
}
}
func (a *Allocation) String() string {
if a == nil {
return "nil allocation"
}
a.RLock()
defer a.RUnlock()
return fmt.Sprintf("applicationID=%s, allocationKey=%s, Node=%s, result=%s", a.applicationID, a.allocationKey, a.nodeID, a.result.String())
}
// GetAsk returns the ask associated with this allocation
func (a *Allocation) GetAsk() *AllocationAsk {
return a.ask
}
// GetAllocationKey returns the allocation key of this allocation
func (a *Allocation) GetAllocationKey() string {
return a.allocationKey
}
// GetApplicationID returns the application ID for this allocation
func (a *Allocation) GetApplicationID() string {
return a.applicationID
}
// GetPartitionName returns the partition name for this allocation
func (a *Allocation) GetPartitionName() string {
return a.partitionName
}
// GetTaskGroup returns the task group name for this allocation
func (a *Allocation) GetTaskGroup() string {
return a.taskGroupName
}
// GetCreateTime returns the time this allocation was created
func (a *Allocation) GetCreateTime() time.Time {
a.RLock()
defer a.RUnlock()
return a.createTime
}
func (a *Allocation) SetCreateTime(createTime time.Time) {
a.Lock()
defer a.Unlock()
a.createTime = createTime
}
// GetBindTime returns the time this allocation was created
func (a *Allocation) GetBindTime() time.Time {
a.RLock()
defer a.RUnlock()
return a.bindTime
}
func (a *Allocation) SetBindTime(bindTime time.Time) {
a.Lock()
defer a.Unlock()
a.bindTime = bindTime
}
// IsPlaceholderUsed returns whether this alloc is replacing a placeholder
func (a *Allocation) IsPlaceholderUsed() bool {
a.RLock()
defer a.RUnlock()
return a.placeholderUsed
}
// SetPlaceholderUsed sets whether this alloc is replacing a placeholder
func (a *Allocation) SetPlaceholderUsed(placeholderUsed bool) {
a.Lock()
defer a.Unlock()
a.placeholderUsed = placeholderUsed
}
// GetPlaceholderCreateTime returns the placeholder's create time for this alloc, if applicable
func (a *Allocation) GetPlaceholderCreateTime() time.Time {
a.RLock()
defer a.RUnlock()
return a.placeholderCreateTime
}
// SetPlaceholderCreateTime updates the placeholder's creation time
func (a *Allocation) SetPlaceholderCreateTime(placeholderCreateTime time.Time) {
a.Lock()
defer a.Unlock()
a.placeholderCreateTime = placeholderCreateTime
}
// IsPlaceholder returns whether the allocation is a placeholder
func (a *Allocation) IsPlaceholder() bool {
return a.placeholder
}
// GetNodeID gets the node this allocation is assigned to
func (a *Allocation) GetNodeID() string {
return a.nodeID
}
// SetInstanceType sets node instance type for this allocation
func (a *Allocation) SetInstanceType(instType string) {
a.Lock()
defer a.Unlock()
a.instType = instType
}
// GetInstanceType return the type of the instance used by this allocation
func (a *Allocation) GetInstanceType() string {
a.RLock()
defer a.RUnlock()
return a.instType
}
// GetPriority returns the priority of this allocation
func (a *Allocation) GetPriority() int32 {
return a.priority
}
// GetReservedNodeID gets the node this allocation is reserved for
func (a *Allocation) GetReservedNodeID() string {
a.RLock()
defer a.RUnlock()
return a.reservedNodeID
}
// SetReservedNodeID sets the node this allocation is reserved for
func (a *Allocation) SetReservedNodeID(reservedNodeID string) {
a.Lock()
defer a.Unlock()
a.reservedNodeID = reservedNodeID
}
// IsReleased returns the release status of the allocation
func (a *Allocation) IsReleased() bool {
a.RLock()
defer a.RUnlock()
return a.released
}
// SetReleased updates the release status of the allocation
func (a *Allocation) SetReleased(released bool) {
a.Lock()
defer a.Unlock()
a.released = released
}
// GetTagsClone returns the copy of the tags for this allocation
func (a *Allocation) GetTagsClone() map[string]string {
return CloneAllocationTags(a.tags)
}
// GetResult gets the result of this allocation
func (a *Allocation) GetResult() AllocationResult {
a.RLock()
defer a.RUnlock()
return a.result
}
// SetResult sets the result of this allocation
func (a *Allocation) SetResult(result AllocationResult) {
a.Lock()
defer a.Unlock()
a.result = result
}
// GetReleasesClone returns a clone of the release list
func (a *Allocation) GetReleasesClone() []*Allocation {
a.RLock()
defer a.RUnlock()
result := make([]*Allocation, len(a.releases))
copy(result, a.releases)
return result
}
// GetFirstRelease returns the first release for this allocation
func (a *Allocation) GetFirstRelease() *Allocation {
a.RLock()
defer a.RUnlock()
return a.releases[0]
}
// GetReleaseCount gets the number of releases associated with this allocation
func (a *Allocation) GetReleaseCount() int {
a.RLock()
defer a.RUnlock()
return len(a.releases)
}
// ClearReleases removes all releases from this allocation
func (a *Allocation) ClearReleases() {
a.Lock()
defer a.Unlock()
a.releases = nil
}
// AddRelease adds a new release to this allocation
func (a *Allocation) AddRelease(release *Allocation) {
a.Lock()
defer a.Unlock()
a.releases = append(a.releases, release)
}
func (a *Allocation) SetRelease(release *Allocation) {
a.Lock()
defer a.Unlock()
a.releases = []*Allocation{release}
}
// GetAllocatedResource returns a reference to the allocated resources for this allocation. This must be treated as read-only.
func (a *Allocation) GetAllocatedResource() *resources.Resource {
return a.allocatedResource
}
// MarkPreempted marks the allocation as preempted.
func (a *Allocation) MarkPreempted() {
a.Lock()
defer a.Unlock()
a.preempted = true
}
// IsPreempted returns whether the allocation has been marked for preemption or not.
func (a *Allocation) IsPreempted() bool {
a.RLock()
defer a.RUnlock()
return a.preempted
}
// CloneAllocationTags clones a tag map for safe copying
func CloneAllocationTags(tags map[string]string) map[string]string {
result := make(map[string]string)
for k, v := range tags {
result[k] = v
}
return result
}