blob: c7c41870961958367923fc978203353ef033fe54 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"sync"
"time"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"github.com/apache/yunikorn-k8shim/pkg/appmgmt"
"github.com/apache/yunikorn-k8shim/pkg/appmgmt/interfaces"
"github.com/apache/yunikorn-k8shim/pkg/cache"
"github.com/apache/yunikorn-k8shim/pkg/callback"
"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
// shim scheduler watches api server and interacts with unity scheduler to allocate pods
type KubernetesShim struct {
apiFactory client.APIProvider
context *cache.Context
appManager *appmgmt.AppManagementService
phManager *cache.PlaceholderManager
callback api.ResourceManagerCallback
stopChan chan struct{}
lock *sync.RWMutex
outstandingAppsFound bool
}
var (
// timeout for logging a message if no outstanding apps were found for scheduling
outstandingAppLogTimeout = 2 * time.Minute
)
func NewShimScheduler(scheduler api.SchedulerAPI, configs *conf.SchedulerConf, bootstrapConfigMaps []*v1.ConfigMap) *KubernetesShim {
kubeClient := client.NewKubeClient(configs.KubeConfig)
// we have disabled re-sync to keep ourselves up-to-date
informerFactory := informers.NewSharedInformerFactory(kubeClient.GetClientSet(), 0)
apiFactory := client.NewAPIFactory(scheduler, informerFactory, configs, false)
context := cache.NewContextWithBootstrapConfigMaps(apiFactory, bootstrapConfigMaps)
rmCallback := callback.NewAsyncRMCallback(context)
appManager := appmgmt.NewAMService(context, apiFactory)
return newShimSchedulerInternal(context, apiFactory, appManager, rmCallback)
}
func NewShimSchedulerForPlugin(scheduler api.SchedulerAPI, informerFactory informers.SharedInformerFactory, configs *conf.SchedulerConf, bootstrapConfigMaps []*v1.ConfigMap) *KubernetesShim {
apiFactory := client.NewAPIFactory(scheduler, informerFactory, configs, false)
context := cache.NewContextWithBootstrapConfigMaps(apiFactory, bootstrapConfigMaps)
utils.SetPluginMode(true)
rmCallback := callback.NewAsyncRMCallback(context)
appManager := appmgmt.NewAMService(context, apiFactory)
return newShimSchedulerInternal(context, apiFactory, appManager, rmCallback)
}
// this is visible for testing
func newShimSchedulerInternal(ctx *cache.Context, apiFactory client.APIProvider,
am *appmgmt.AppManagementService, cb api.ResourceManagerCallback) *KubernetesShim {
ss := &KubernetesShim{
apiFactory: apiFactory,
context: ctx,
appManager: am,
phManager: cache.NewPlaceholderManager(apiFactory.GetAPIs()),
callback: cb,
stopChan: make(chan struct{}),
lock: &sync.RWMutex{},
outstandingAppsFound: false,
}
// init dispatcher
dispatcher.RegisterEventHandler(dispatcher.EventTypeApp, ctx.ApplicationEventHandler())
dispatcher.RegisterEventHandler(dispatcher.EventTypeTask, ctx.TaskEventHandler())
dispatcher.RegisterEventHandler(dispatcher.EventTypeNode, ctx.SchedulerNodeEventHandler())
return ss
}
func (ss *KubernetesShim) GetContext() *cache.Context {
return ss.context
}
func (ss *KubernetesShim) recoverSchedulerState() error {
log.Log(log.ShimScheduler).Info("recovering scheduler states")
// step 1: recover all applications
// this step, we collect all the existing allocated pods from api-server,
// identify the scheduling identity (aka applicationInfo) from the pod,
// and then add these applications to the scheduler.
if err := ss.appManager.WaitForRecovery(); err != nil {
// failed
log.Log(log.ShimScheduler).Error("scheduler recovery failed", zap.Error(err))
return err
}
// step 2: recover existing allocations
// this step, we collect all existing allocations (allocated pods) from api-server,
// rerun the scheduling for these allocations in order to restore scheduler-state,
// the rerun is like a replay, not a actual scheduling procedure.
recoverableAppManagers := make([]interfaces.Recoverable, 0)
for _, appMgr := range ss.appManager.GetAllManagers() {
if m, ok := appMgr.(interfaces.Recoverable); ok {
recoverableAppManagers = append(recoverableAppManagers, m)
}
}
if err := ss.context.WaitForRecovery(recoverableAppManagers, 5*time.Minute); err != nil {
// failed
log.Log(log.ShimScheduler).Error("scheduler recovery failed", zap.Error(err))
return err
}
// success
log.Log(log.ShimScheduler).Info("scheduler recovery succeed")
return nil
}
func (ss *KubernetesShim) doScheduling() {
// add event handlers to the context
ss.context.AddSchedulingEventHandlers()
// run main scheduling loop
go wait.Until(ss.schedule, conf.GetSchedulerConf().GetSchedulingInterval(), ss.stopChan)
// log a message if no outstanding requests were found for a while
go wait.Until(ss.checkOutstandingApps, outstandingAppLogTimeout, ss.stopChan)
}
func (ss *KubernetesShim) registerShimLayer() error {
configuration := conf.GetSchedulerConf()
buildInfoMap := conf.GetBuildInfoMap()
configMaps, err := ss.context.LoadConfigMaps()
if err != nil {
log.Log(log.ShimScheduler).Error("failed to load yunikorn configmaps", zap.Error(err))
return err
}
confMap := conf.FlattenConfigMaps(configMaps)
config := utils.GetCoreSchedulerConfigFromConfigMap(confMap)
extraConfig := utils.GetExtraConfigFromConfigMap(confMap)
registerMessage := si.RegisterResourceManagerRequest{
RmID: configuration.ClusterID,
Version: configuration.ClusterVersion,
PolicyGroup: configuration.PolicyGroup,
BuildInfo: buildInfoMap,
Config: config,
ExtraConfig: extraConfig,
}
log.Log(log.ShimScheduler).Info("register RM to the scheduler",
zap.String("clusterID", configuration.ClusterID),
zap.String("clusterVersion", configuration.ClusterVersion),
zap.String("policyGroup", configuration.PolicyGroup),
zap.Any("buildInfo", buildInfoMap))
if _, err := ss.apiFactory.GetAPIs().SchedulerAPI.
RegisterResourceManager(&registerMessage, ss.callback); err != nil {
return err
}
return nil
}
// each schedule iteration, we scan all apps and triggers app state transition
func (ss *KubernetesShim) schedule() {
apps := ss.context.GetAllApplications()
for _, app := range apps {
if app.Schedule() {
ss.setOutstandingAppsFound(true)
}
}
}
func (ss *KubernetesShim) Run() error {
// NOTE: the order of starting these services matter,
// please look at the comments before modifying the orders
// run dispatcher
// the dispatcher handles the basic event dispatching,
// it needs to be started at first
dispatcher.Start()
// run the placeholder manager
ss.phManager.Start()
// run the client library code that communicates with Kubernetes
ss.apiFactory.Start()
// register shim with core
if err := ss.registerShimLayer(); err != nil {
log.Log(log.ShimScheduler).Error("failed to register shim with core", zap.Error(err))
ss.Stop()
return err
}
// run app managers
// the app manager launches the pod event handlers
// it needs to be started after the shim is registered with the core
if err := ss.appManager.Start(); err != nil {
log.Log(log.ShimScheduler).Error("failed to start app manager", zap.Error(err))
ss.Stop()
return err
}
// recover scheduler state
if err := ss.recoverSchedulerState(); err != nil {
log.Log(log.ShimScheduler).Error("failed to recover scheduler state", zap.Error(err))
ss.Stop()
return err
}
// start scheduling loop
ss.doScheduling()
return nil
}
func (ss *KubernetesShim) Stop() {
log.Log(log.ShimScheduler).Info("stopping scheduler")
select {
case ss.stopChan <- struct{}{}:
// stop the dispatcher
dispatcher.Stop()
// stop the app manager
ss.appManager.Stop()
// stop the placeholder manager
ss.phManager.Stop()
default:
log.Log(log.ShimScheduler).Info("scheduler is already stopped")
}
}
func (ss *KubernetesShim) checkOutstandingApps() {
if !ss.getOutstandingAppsFound() {
log.Log(log.ShimScheduler).Info("No outstanding apps found for a while", zap.Duration("timeout", outstandingAppLogTimeout))
return
}
ss.setOutstandingAppsFound(false)
}
func (ss *KubernetesShim) getOutstandingAppsFound() bool {
ss.lock.RLock()
defer ss.lock.RUnlock()
return ss.outstandingAppsFound
}
func (ss *KubernetesShim) setOutstandingAppsFound(value bool) {
ss.lock.Lock()
defer ss.lock.Unlock()
ss.outstandingAppsFound = value
}