blob: b752d1fef72056112f38d49136b3bf6edac70c9a [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package application
import (
"context"
"fmt"
"strings"
"time"
"github.com/apache/yunikorn-k8shim/pkg/client"
"go.uber.org/zap"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
appv1 "github.com/apache/yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1"
"github.com/apache/yunikorn-k8shim/pkg/appmgmt/interfaces"
shimcache "github.com/apache/yunikorn-k8shim/pkg/cache"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/log"
)
type AppManager struct {
amProtocol interfaces.ApplicationManagementProtocol
//controller *Controller
apiProvider client.APIProvider
}
const appIDDelimiter = "-"
func NewAppManager(amProtocol interfaces.ApplicationManagementProtocol, apiProvider client.APIProvider) *AppManager {
return &AppManager{
amProtocol: amProtocol,
apiProvider: apiProvider,
}
}
// this implements AppManagementService interface
func (appMgr *AppManager) Name() string {
return constants.AppManagerHandlerName
}
// this implements AppManagementService interface
func (appMgr *AppManager) ServiceInit() error {
return nil
}
// this implements AppManagementService interface
func (appMgr *AppManager) Start() error {
appMgr.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.ApplicationInformerHandlers,
AddFn: appMgr.addApp,
DeleteFn: appMgr.deleteApp,
})
return nil
}
// this implements AppManagementService interface
func (appMgr *AppManager) Stop() {
//noop
}
// handle the updates from the scheduler and sync the status change
func (appMgr *AppManager) HandleApplicationStateUpdate() func(obj interface{}) {
return func(obj interface{}) {
if event, ok := obj.(events.ApplicationEvent); ok {
if shimcache.AppStateChange.String() == event.GetEvent() {
if shimEvent, ok := event.(shimcache.ApplicationStatusChangeEvent); ok {
appID := event.GetApplicationID()
log.Logger().Info("Status Change callback received",
zap.String("app id", appID),
zap.String("new status", shimEvent.GetState()))
var app = appMgr.amProtocol.GetApplication(appID).(*shimcache.Application)
appName, err := getNameFromAppID(appID)
if err != nil {
log.Logger().Warn("Failed to handle status update",
zap.String("application ID", appID),
zap.Error(err))
}
appCRD, err := appMgr.apiProvider.GetAPIs().AppInformer.Lister().Applications(app.GetTags()[constants.AppTagNamespace]).Get(appName)
if err != nil {
log.Logger().Warn("Failed to query app CRD for status update",
zap.String("Application ID", appID),
zap.Error(err))
}
crdState := convertShimAppStateToAppCRDState(shimEvent.GetState())
if crdState != "Undefined" {
appMgr.updateAppCRDStatus(appCRD, crdState)
} else {
log.Logger().Error("Invalid status, skip saving it",
zap.String("App id", appID))
//create some error
}
log.Logger().Debug("Application status changed",
zap.String("AppID", appID))
}
}
}
}
}
/*
Remove the application from the scheduler as well
*/
func (appMgr *AppManager) deleteApp(obj interface{}) {
app, ok := obj.(*appv1.Application)
if !ok {
log.Logger().Error("obj is not an Application")
return
}
appID := constructAppID(app.Name, app.Namespace)
err := appMgr.amProtocol.RemoveApplication(appID)
if err != nil {
log.Logger().Error("Application removal failed",
zap.String("appID", appID),
zap.Error(err))
return
}
log.Logger().Debug("App CRD deleted",
zap.String("Name", appID))
}
/*
Add application to scheduler
*/
func (appMgr *AppManager) addApp(obj interface{}) {
appCRD, ok := obj.(*appv1.Application)
if !ok {
log.Logger().Error("obj is not an Application")
return
}
if appMeta, ok := appMgr.getAppMetadata(appCRD); ok {
app := appMgr.amProtocol.GetApplication(appMeta.ApplicationID)
if app == nil {
appMgr.amProtocol.AddApplication(&interfaces.AddApplicationRequest{
Metadata: appMeta,
})
// set and save status = New in case it is not set. In case of recovery don't overwrite it
if len(appCRD.Status.AppStatus) == 0 {
appMgr.updateAppCRDStatus(appCRD, appv1.NewApplicationState)
}
}
}
}
func (appMgr *AppManager) updateAppCRDStatus(appCRD *appv1.Application, status appv1.ApplicationStateType) {
if appCRD == nil {
log.Logger().Error("AppCRD is nil, there is nothing to update")
return
}
appCopy := appCRD.DeepCopy()
appCopy.Status = appv1.ApplicationStatus{
AppStatus: status,
Message: "app CRD status change",
LastUpdate: v1.NewTime(time.Now()),
}
_, err := appMgr.apiProvider.GetAPIs().AppClient.ApacheV1alpha1().Applications(appCRD.Namespace).UpdateStatus(context.Background(), appCopy, v1.UpdateOptions{})
if err != nil {
log.Logger().Error("Failed to update application CRD",
zap.String("AppId", appCopy.Name))
return
}
}
func (appMgr *AppManager) getAppMetadata(app *appv1.Application) (interfaces.ApplicationMetadata, bool) {
appID := constructAppID(app.Name, app.Namespace)
// tags will at least have defaultNamespace info
// labels or annotations from the pod can be added when needed
// user info is retrieved via service account
tags := map[string]string{}
if app.Namespace == "" {
tags[constants.AppTagNamespace] = constants.DefaultAppNamespace
} else {
tags[constants.AppTagNamespace] = app.Namespace
}
return interfaces.ApplicationMetadata{
ApplicationID: appID,
QueueName: app.Spec.Queue,
User: "default",
Tags: tags,
}, true
}
func constructAppID(name string, namespace string) string {
return namespace + appIDDelimiter + name
}
func getNameFromAppID(appID string) (string, error) {
if len(appID) > 0 {
splitted := strings.Split(appID, appIDDelimiter)
if len(splitted) < 2 {
return "", fmt.Errorf("unknown appID format")
}
return splitted[1], nil
}
return "", fmt.Errorf("appID should not be empty")
}
const undefinedState = "Undefined"
func convertShimAppStateToAppCRDState(status string) appv1.ApplicationStateType {
switch status {
case "New":
return appv1.NewApplicationState
case "Accepted":
return appv1.AcceptedState
case "Starting":
return appv1.StartingState
case "Running":
return appv1.RunningState
case "Waiting":
return appv1.WaitingState
case "Rejected":
return appv1.RejectedState
case "Completed":
return appv1.CompletedState
case "Killed":
return appv1.KilledState
default:
return undefinedState
}
}