blob: 9a0e39e584884b8d130dc727f3286ab1b2bd426c [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"strings"
"sync"
"sync/atomic"
"time"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/log"
)
// PlaceholderManager is a service to manage the lifecycle of app placeholders
type PlaceholderManager struct {
// clients can neve be nil, even the kubeclient cannot be nil as the shim will not start without it
clients *client.Clients
// when the placeholder manager is unable to delete a pod,
// this pod becomes to be an "orphan" pod. We add them to a map
// and keep retrying deleting them in order to avoid wasting resources.
orphanPods map[string]*v1.Pod
stopChan chan struct{}
running atomic.Value
cleanupTime time.Duration
// a simple mutex will do we do not have separate read and write paths
sync.RWMutex
}
var (
placeholderMgr *PlaceholderManager
mu sync.Mutex
)
func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
mu.Lock()
defer mu.Unlock()
var r atomic.Value
r.Store(false)
placeholderMgr = &PlaceholderManager{
clients: clients,
running: r,
orphanPods: make(map[string]*v1.Pod),
stopChan: make(chan struct{}),
cleanupTime: 5 * time.Second,
}
return placeholderMgr
}
func getPlaceholderManager() *PlaceholderManager {
mu.Lock()
defer mu.Unlock()
return placeholderMgr
}
func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
mgr.Lock()
defer mgr.Unlock()
existingPlaceHolders := map[string]struct{}{}
for _, phTasks := range app.GetPlaceHolderTasks() {
existingPlaceHolders[phTasks.GetTaskPod().GetName()] = struct{}{}
}
// iterate all task groups, create placeholders for all the min members
for _, tg := range app.getTaskGroups() {
for i := int32(0); i < tg.MinMember; i++ {
placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
// when performing recovery, do not create pods that are already running
if _, ok := existingPlaceHolders[placeholderName]; ok {
log.Logger().Info("Placeholder pod already exists",
zap.String("name", placeholderName))
continue
}
placeholder := newPlaceholder(placeholderName, app, tg)
// create the placeholder on K8s
_, err := mgr.clients.KubeClient.Create(placeholder.pod)
if err != nil {
log.Logger().Error("failed to create placeholder pod",
zap.Error(err))
return err
}
log.Logger().Info("placeholder created",
zap.String("placeholder", placeholder.String()))
}
}
return nil
}
// clean up all the placeholders for an application
func (mgr *PlaceholderManager) cleanUp(app *Application) {
mgr.Lock()
defer mgr.Unlock()
log.Logger().Info("start to clean up app placeholders",
zap.String("appID", app.GetApplicationID()))
for taskID, task := range app.taskMap {
if task.IsPlaceholder() {
// remove pod
err := mgr.clients.KubeClient.Delete(task.pod)
if err != nil {
log.Logger().Warn("failed to clean up placeholder pod",
zap.Error(err))
if !strings.Contains(err.Error(), "not found") {
mgr.orphanPods[taskID] = task.pod
}
}
}
}
log.Logger().Info("finished cleaning up app placeholders",
zap.String("appID", app.GetApplicationID()))
}
func (mgr *PlaceholderManager) cleanOrphanPlaceholders() {
mgr.Lock()
defer mgr.Unlock()
for taskID, pod := range mgr.orphanPods {
log.Logger().Debug("start to clean up orphan pod",
zap.String("taskID", taskID),
zap.String("podName", pod.Name))
err := mgr.clients.KubeClient.Delete(pod)
if err != nil {
log.Logger().Warn("failed to clean up orphan pod", zap.Error(err))
} else {
delete(mgr.orphanPods, taskID)
}
}
}
func (mgr *PlaceholderManager) Start() {
if mgr.isRunning() {
log.Logger().Info("PlaceholderManager is already started")
return
}
log.Logger().Info("starting the PlaceholderManager")
mgr.setRunning(true)
go func() {
// clean orphan placeholders approximately every 5 seconds
for {
select {
case <-mgr.stopChan:
mgr.setRunning(false)
log.Logger().Info("PlaceholderManager has been stopped")
return
case <-time.After(mgr.getCleanupTime()):
mgr.cleanOrphanPlaceholders()
}
}
}()
}
func (mgr *PlaceholderManager) Stop() {
if !mgr.isRunning() {
log.Logger().Info("PlaceholderManager already stopped")
return
}
log.Logger().Info("stopping the PlaceholderManager")
mgr.stopChan <- struct{}{}
}
func (mgr *PlaceholderManager) isRunning() bool {
return mgr.running.Load().(bool)
}
func (mgr *PlaceholderManager) setRunning(flag bool) {
mgr.running.Store(flag)
}
func (mgr *PlaceholderManager) getOrphanPodsLength() int {
mgr.RLock()
defer mgr.RUnlock()
return len(mgr.orphanPods)
}
func (mgr *PlaceholderManager) setCleanupTime(value time.Duration) {
mgr.Lock()
defer mgr.Unlock()
mgr.cleanupTime = value
}
func (mgr *PlaceholderManager) getCleanupTime() time.Duration {
mgr.RLock()
defer mgr.RUnlock()
return mgr.cleanupTime
}