blob: 8c58695b3a94906809f0a5f2e3ebe90035df9582 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/looplab/fsm"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"github.com/apache/yunikorn-k8shim/pkg/common"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
type Task struct {
taskID string
alias string
applicationID string
application *Application
allocationUUID string
resource *si.Resource
pod *v1.Pod
podStatus v1.PodStatus // pod status, maintained separately for efficiency reasons
context *Context
nodeName string
createTime time.Time
taskGroupName string
placeholder bool
terminationType string
originator bool
schedulingState TaskSchedulingState
sm *fsm.FSM
lock *sync.RWMutex
}
func NewTask(tid string, app *Application, ctx *Context, pod *v1.Pod) *Task {
taskResource := common.GetPodResource(pod)
return createTaskInternal(tid, app, taskResource, pod, false, "", ctx, false)
}
func NewTaskPlaceholder(tid string, app *Application, ctx *Context, pod *v1.Pod) *Task {
taskResource := common.GetPodResource(pod)
return createTaskInternal(tid, app, taskResource, pod, true, "", ctx, false)
}
func NewFromTaskMeta(tid string, app *Application, ctx *Context, metadata TaskMetadata, originator bool) *Task {
taskPod := metadata.Pod
taskResource := common.GetPodResource(taskPod)
return createTaskInternal(
tid,
app,
taskResource,
metadata.Pod,
metadata.Placeholder,
metadata.TaskGroupName,
ctx,
originator)
}
func createTaskInternal(tid string, app *Application, resource *si.Resource,
pod *v1.Pod, placeholder bool, taskGroupName string, ctx *Context, originator bool) *Task {
task := &Task{
taskID: tid,
alias: fmt.Sprintf("%s/%s", pod.Namespace, pod.Name),
applicationID: app.GetApplicationID(),
application: app,
pod: pod,
podStatus: *pod.Status.DeepCopy(),
resource: resource,
createTime: pod.GetCreationTimestamp().Time,
placeholder: placeholder,
taskGroupName: taskGroupName,
originator: originator,
context: ctx,
sm: newTaskState(),
schedulingState: TaskSchedPending,
lock: &sync.RWMutex{},
}
if tgName := utils.GetTaskGroupFromPodSpec(pod); tgName != "" {
task.taskGroupName = tgName
}
task.initialize()
return task
}
// event handling
func (task *Task) handle(te events.TaskEvent) error {
task.lock.Lock()
defer task.lock.Unlock()
err := task.sm.Event(context.Background(), te.GetEvent(), task, te.GetArgs())
// handle the same state transition not nil error (limit of fsm).
if err != nil && err.Error() != "no transition" {
return err
}
return nil
}
func (task *Task) canHandle(te events.TaskEvent) bool {
task.lock.RLock()
defer task.lock.RUnlock()
return task.sm.Can(te.GetEvent())
}
func (task *Task) GetTaskPod() *v1.Pod {
task.lock.RLock()
defer task.lock.RUnlock()
return task.pod
}
func (task *Task) GetTaskID() string {
task.lock.RLock()
defer task.lock.RUnlock()
return task.taskID
}
func (task *Task) IsPlaceholder() bool {
task.lock.RLock()
defer task.lock.RUnlock()
return task.placeholder
}
func (task *Task) GetTaskState() string {
// fsm has its own internal lock, we don't need to hold node's lock here
return task.sm.Current()
}
func (task *Task) setTaskGroupName(groupName string) {
task.lock.Lock()
defer task.lock.Unlock()
task.taskGroupName = groupName
}
func (task *Task) setTaskTerminationType(terminationTyp string) {
task.lock.Lock()
defer task.lock.Unlock()
task.terminationType = terminationTyp
}
func (task *Task) getTaskGroupName() string {
task.lock.RLock()
defer task.lock.RUnlock()
return task.taskGroupName
}
func (task *Task) getTaskAllocationUUID() string {
task.lock.RLock()
defer task.lock.RUnlock()
return task.allocationUUID
}
func (task *Task) DeleteTaskPod(pod *v1.Pod) error {
return task.context.apiProvider.GetAPIs().KubeClient.Delete(task.pod)
}
func (task *Task) UpdateTaskPodStatus(pod *v1.Pod) (*v1.Pod, error) {
return task.context.apiProvider.GetAPIs().KubeClient.UpdateStatus(pod)
}
func (task *Task) UpdateTaskPod(pod *v1.Pod, podMutator func(pod *v1.Pod)) (*v1.Pod, error) {
return task.context.apiProvider.GetAPIs().KubeClient.UpdatePod(pod, podMutator)
}
func (task *Task) isTerminated() bool {
for _, states := range TaskStates().Terminated {
if task.GetTaskState() == states {
return true
}
}
return false
}
// task object initialization
// normally when task is added, the task state is New
// but during recovery, we need to init the task state according to
// the task pod status. if the pod is already terminated,
// we should mark the task as completed according.
func (task *Task) initialize() {
task.lock.Lock()
defer task.lock.Unlock()
// task needs recovery means the task has already been
// scheduled by us with an allocation, instead of starting
// from New, directly set the task to Bound.
if utils.NeedRecovery(task.pod) {
task.allocationUUID = string(task.pod.UID)
task.nodeName = task.pod.Spec.NodeName
task.sm.SetState(TaskStates().Bound)
log.Log(log.ShimCacheTask).Info("set task as Bound",
zap.String("appID", task.applicationID),
zap.String("taskID", task.taskID),
zap.String("allocationUUID", task.allocationUUID),
zap.String("nodeName", task.nodeName))
}
// task already terminated, succeed or failed
// that means the task was already allocated and completed
// the resources were already released, instead of starting
// from New, directly set the task to Completed
if utils.IsPodTerminated(task.pod) {
task.allocationUUID = string(task.pod.UID)
task.nodeName = task.pod.Spec.NodeName
task.sm.SetState(TaskStates().Completed)
log.Log(log.ShimCacheTask).Info("set task as Completed",
zap.String("appID", task.applicationID),
zap.String("taskID", task.taskID),
zap.String("allocationUUID", task.allocationUUID),
zap.String("nodeName", task.nodeName))
}
}
func (task *Task) IsOriginator() bool {
task.lock.RLock()
defer task.lock.RUnlock()
return task.originator
}
func (task *Task) isPreemptSelfAllowed() bool {
value := utils.GetPodAnnotationValue(task.pod, constants.AnnotationAllowPreemption)
switch value {
case constants.True:
return true
case constants.False:
return false
default:
return task.context.IsPreemptSelfAllowed(task.pod.Spec.PriorityClassName)
}
}
func (task *Task) isPreemptOtherAllowed() bool {
policy := task.pod.Spec.PreemptionPolicy
if policy == nil {
return true
}
switch *policy {
case v1.PreemptNever:
return false
case v1.PreemptLowerPriority:
return true
default:
return true
}
}
func (task *Task) SetTaskSchedulingState(state TaskSchedulingState) {
task.lock.Lock()
defer task.lock.Unlock()
task.schedulingState = state
}
func (task *Task) GetTaskSchedulingState() TaskSchedulingState {
task.lock.RLock()
defer task.lock.RUnlock()
return task.schedulingState
}
func (task *Task) handleSubmitTaskEvent() {
log.Log(log.ShimCacheTask).Debug("scheduling pod",
zap.String("podName", task.pod.Name))
// build preemption policy
preemptionPolicy := &si.PreemptionPolicy{
AllowPreemptSelf: task.isPreemptSelfAllowed(),
AllowPreemptOther: task.isPreemptOtherAllowed(),
}
// convert the request
rr := common.CreateAllocationRequestForTask(
task.applicationID,
task.taskID,
task.resource,
task.placeholder,
task.taskGroupName,
task.pod,
task.originator,
preemptionPolicy)
log.Log(log.ShimCacheTask).Debug("send update request", zap.Stringer("request", rr))
if err := task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(rr); err != nil {
log.Log(log.ShimCacheTask).Debug("failed to send scheduling request to scheduler", zap.Error(err))
return
}
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, v1.EventTypeNormal, "Scheduling", "Scheduling",
"%s is queued and waiting for allocation", task.alias)
// if this task belongs to a task group, that means the app has gang scheduling enabled
// in this case, post an event to indicate the task is being gang scheduled
if !task.placeholder && task.taskGroupName != "" {
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeNormal, "GangScheduling", "GangScheduling",
"Pod belongs to the taskGroup %s, it will be scheduled as a gang member", task.taskGroupName)
}
}
// this is called after task reaches PENDING state,
// submit the resource asks from this task to the scheduler core
func (task *Task) postTaskPending() {
dispatcher.Dispatch(NewSubmitTaskEvent(task.applicationID, task.taskID))
}
// postTaskAllocated is called after task reaches ALLOCATED state.
// This routine binds the pod to the allocated node.
// It calls K8s api to bind a pod to the assigned node, this may need some time,
// so we do a delay binding, background process, to avoid blocking main process.
// The result of the binding is tracked and failures are properly handled.
// If successful, we move task to next state BOUND, otherwise we fail the task
func (task *Task) postTaskAllocated() {
go func() {
// we need to obtain task's lock first,
// this ensures no other threads modifying task state at the time being
task.lock.Lock()
defer task.lock.Unlock()
// plugin mode means we delegate this work to the default scheduler
if utils.IsPluginMode() {
log.Log(log.ShimCacheTask).Debug("allocating pod",
zap.String("podName", task.pod.Name),
zap.String("podUID", string(task.pod.UID)))
task.context.AddPendingPodAllocation(string(task.pod.UID), task.nodeName)
dispatcher.Dispatch(NewBindTaskEvent(task.applicationID, task.taskID))
events.GetRecorder().Eventf(task.pod.DeepCopy(),
nil, v1.EventTypeNormal, "QuotaApproved", "QuotaApproved",
"Pod %s is ready for scheduling on node %s", task.alias, task.nodeName)
} else {
// post a message to indicate the pod gets its allocation
events.GetRecorder().Eventf(task.pod.DeepCopy(),
nil, v1.EventTypeNormal, "Scheduled", "Scheduled",
"Successfully assigned %s to node %s", task.alias, task.nodeName)
// before binding pod to node, first bind volumes to pod
log.Log(log.ShimCacheTask).Debug("bind pod volumes",
zap.String("podName", task.pod.Name),
zap.String("podUID", string(task.pod.UID)))
if task.context.apiProvider.GetAPIs().VolumeBinder != nil {
if err := task.context.bindPodVolumes(task.pod); err != nil {
errorMessage := fmt.Sprintf("bind volumes to pod failed, name: %s, %s", task.alias, err.Error())
dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID, errorMessage))
events.GetRecorder().Eventf(task.pod.DeepCopy(),
nil, v1.EventTypeWarning, "PodVolumesBindFailure", "PodVolumesBindFailure", errorMessage)
return
}
}
log.Log(log.ShimCacheTask).Debug("bind pod",
zap.String("podName", task.pod.Name),
zap.String("podUID", string(task.pod.UID)))
if err := task.context.apiProvider.GetAPIs().KubeClient.Bind(task.pod, task.nodeName); err != nil {
errorMessage := fmt.Sprintf("bind pod to node failed, name: %s, %s", task.alias, err.Error())
log.Log(log.ShimCacheTask).Error(errorMessage)
dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID, errorMessage))
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeWarning, "PodBindFailure", "PodBindFailure", errorMessage)
return
}
log.Log(log.ShimCacheTask).Info("successfully bound pod", zap.String("podName", task.pod.Name))
dispatcher.Dispatch(NewBindTaskEvent(task.applicationID, task.taskID))
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeNormal, "PodBindSuccessful", "PodBindSuccessful",
"Pod %s is successfully bound to node %s", task.alias, task.nodeName)
}
task.schedulingState = TaskSchedAllocated
}()
}
// beforeTaskAllocated is called before handling the TaskAllocated event.
// This sets the allocation information returned by the core in the task.
// In some cases, the task is canceled (e.g. pod deleted) before we process the allocation
// from the core. Those task will already be in the Completed state.
// If we find the task is already in Completed state while handling TaskAllocated
// event, we need to explicitly release this allocation because it is no
// longer valid.
func (task *Task) beforeTaskAllocated(eventSrc string, allocUUID string, nodeID string) {
// task is allocated on a node with a UUID set the details in the task here to allow referencing later.
task.allocationUUID = allocUUID
task.nodeName = nodeID
// If the task is Completed the pod was deleted on K8s but the core was not aware yet.
// Notify the core to release this allocation to avoid resource leak.
// The ask is not relevant at this point.
if eventSrc == TaskStates().Completed {
log.Log(log.ShimCacheTask).Info("task is already completed, invalidate the allocation",
zap.String("currentTaskState", eventSrc),
zap.String("allocUUID", allocUUID),
zap.String("allocatedNode", nodeID))
task.releaseAllocation()
}
}
func (task *Task) postTaskBound() {
if utils.IsPluginMode() {
// When the pod is actively scheduled by YuniKorn, it can be moved to the default-scheduler's
// UnschedulablePods structure. If the pod does not change, the pod will stay in the UnschedulablePods
// structure for podMaxInUnschedulablePodsDuration (default 5 minutes). Here we update the pod
// explicitly to move it back to the active queue.
// See: pkg/scheduler/internal/queue/scheduling_queue.go:isPodUpdated() for what is considered updated.
podCopy := task.pod.DeepCopy()
if _, err := task.UpdateTaskPod(podCopy, func(pod *v1.Pod) {
// Ensure that the default scheduler detects the pod as having changed
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[constants.DomainYuniKorn+"scheduled-at"] = strconv.FormatInt(time.Now().UnixNano(), 10)
}); err != nil {
log.Log(log.ShimCacheTask).Warn("failed to update pod status", zap.Error(err))
}
}
if task.placeholder {
log.Log(log.ShimCacheTask).Info("placeholder is bound",
zap.String("appID", task.applicationID),
zap.String("taskName", task.alias),
zap.String("taskGroupName", task.taskGroupName))
dispatcher.Dispatch(NewUpdateApplicationReservationEvent(task.applicationID))
}
}
func (task *Task) postTaskRejected() {
// currently, once task is rejected by scheduler, we directly move task to failed state.
// so this function simply triggers the state transition when it is rejected.
// but further, we can introduce retry mechanism if necessary.
dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID,
fmt.Sprintf("task %s failed because it is rejected by scheduler", task.alias)))
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeWarning, "TaskRejected", "TaskRejected",
"Task %s is rejected by the scheduler", task.alias)
}
// beforeTaskFail releases the allocation or ask from scheduler core
// this is done as a before hook because the releaseAllocation() call needs to
// send different requests to scheduler-core, depending on current task state
func (task *Task) beforeTaskFail() {
task.releaseAllocation()
}
func (task *Task) postTaskFailed(reason string) {
log.Log(log.ShimCacheTask).Error("task failed",
zap.String("appID", task.applicationID),
zap.String("taskID", task.taskID),
zap.String("reason", reason))
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeNormal, "TaskFailed", "TaskFailed",
"Task %s is failed", task.alias)
}
// beforeTaskCompleted releases the allocation or ask from scheduler core
// this is done as a before hook because the releaseAllocation() call needs to
// send different requests to scheduler-core, depending on current task state
func (task *Task) beforeTaskCompleted() {
task.releaseAllocation()
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeNormal, "TaskCompleted", "TaskCompleted",
"Task %s is completed", task.alias)
}
// releaseAllocation sends the release request for the Allocation or the AllocationAsk to the core.
func (task *Task) releaseAllocation() {
// scheduler api might be nil in some tests
if task.context.apiProvider.GetAPIs().SchedulerAPI != nil {
log.Log(log.ShimCacheTask).Debug("prepare to send release request",
zap.String("applicationID", task.applicationID),
zap.String("taskID", task.taskID),
zap.String("taskAlias", task.alias),
zap.String("allocationUUID", task.allocationUUID),
zap.String("task", task.GetTaskState()),
zap.String("terminationType", task.terminationType))
// The message depends on current task state, generate requests accordingly.
// If allocated send an AllocationReleaseRequest,
// If not allocated yet send an AllocationAskReleaseRequest
var releaseRequest *si.AllocationRequest
s := TaskStates()
switch task.GetTaskState() {
case s.New, s.Pending, s.Scheduling, s.Rejected:
releaseRequest = common.CreateReleaseAskRequestForTask(
task.applicationID, task.taskID, task.application.partition)
default:
if task.allocationUUID == "" {
log.Log(log.ShimCacheTask).Warn("BUG: task allocation UUID is empty on release",
zap.String("applicationID", task.applicationID),
zap.String("taskID", task.taskID),
zap.String("taskAlias", task.alias),
zap.String("task", task.GetTaskState()))
return
}
releaseRequest = common.CreateReleaseAllocationRequestForTask(
task.applicationID, task.allocationUUID, task.application.partition, task.terminationType)
}
if releaseRequest.Releases != nil {
log.Log(log.ShimCacheTask).Info("releasing allocations",
zap.Int("numOfAsksToRelease", len(releaseRequest.Releases.AllocationAsksToRelease)),
zap.Int("numOfAllocationsToRelease", len(releaseRequest.Releases.AllocationsToRelease)))
}
if err := task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseRequest); err != nil {
log.Log(log.ShimCacheTask).Debug("failed to send scheduling request to scheduler", zap.Error(err))
}
}
}
// some sanity checks before sending task for scheduling,
// this reduces the scheduling overhead by blocking such
// request away from the core scheduler.
func (task *Task) sanityCheckBeforeScheduling() error {
// Check PVCs used by the pod
namespace := task.pod.Namespace
manifest := &(task.pod.Spec)
for i := range manifest.Volumes {
volume := &manifest.Volumes[i]
if volume.PersistentVolumeClaim == nil {
// Volume is not a PVC, ignore
continue
}
pvcName := volume.PersistentVolumeClaim.ClaimName
log.Log(log.ShimCacheTask).Debug("checking PVC", zap.String("name", pvcName))
pvc, err := task.context.apiProvider.GetAPIs().PVCInformer.Lister().PersistentVolumeClaims(namespace).Get(pvcName)
if err != nil {
return err
}
if pvc.DeletionTimestamp != nil {
return fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name)
}
}
return nil
}
func (task *Task) UpdatePodCondition(podCondition *v1.PodCondition) (bool, *v1.Pod) {
task.lock.Lock()
defer task.lock.Unlock()
status := task.podStatus.DeepCopy()
pod := task.pod.DeepCopy()
pod.Status = *status
if !utils.PodUnderCondition(pod, podCondition) {
log.Log(log.ShimContext).Debug("updating pod condition",
zap.String("namespace", task.pod.Namespace),
zap.String("name", task.pod.Name),
zap.Any("podCondition", podCondition))
if podutil.UpdatePodCondition(&task.podStatus, podCondition) {
status = task.podStatus.DeepCopy()
pod.Status = *status
return true, pod
}
}
return false, pod
}