blob: 50b65a67a3ae38f57d70c2c09bada9f40604d222 [file] [log] [blame]
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeletconfig
import (
"fmt"
"path/filepath"
"time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/config/validation"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint/store"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
utilpanic "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/panic"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const (
storeDir = "store"
// TODO(mtaufen): We may expose this in a future API, but for the time being we use an internal default,
// because it is not especially clear where this should live in the API.
configTrialDuration = 10 * time.Minute
)
// TransformFunc edits the KubeletConfiguration in-place, and returns an
// error if any of the transformations failed.
type TransformFunc func(kc *kubeletconfig.KubeletConfiguration) error
// Controller manages syncing dynamic Kubelet configurations
// For more information, see the proposal: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node/dynamic-kubelet-configuration.md
type Controller struct {
// transform applies an arbitrary transformation to config after loading, and before validation.
// This can be used, for example, to include config from flags before the controller's validation step.
// If transform returns an error, loadConfig will fail, and an InternalError will be reported.
// Be wary if using this function as an extension point, in most cases the controller should
// probably just be natively extended to do what you need. Injecting flag precedence transformations
// is something of an exception because the caller of this controller (cmd/) is aware of flags, but this
// controller's tree (pkg/) is not.
transform TransformFunc
// pendingConfigSource; write to this channel to indicate that the config source needs to be synced from the API server
pendingConfigSource chan bool
// configStatus manages the status we report on the Node object
configStatus status.NodeConfigStatus
// nodeInformer is the informer that watches the Node object
nodeInformer cache.SharedInformer
// remoteConfigSourceInformer is the informer that watches the assigned config source
remoteConfigSourceInformer cache.SharedInformer
// checkpointStore persists config source checkpoints to a storage layer
checkpointStore store.Store
}
// NewController constructs a new Controller object and returns it. The dynamicConfigDir
// path must be absolute. transform applies an arbitrary transformation to config after loading, and before validation.
// This can be used, for example, to include config from flags before the controller's validation step.
// If transform returns an error, loadConfig will fail, and an InternalError will be reported.
// Be wary if using this function as an extension point, in most cases the controller should
// probably just be natively extended to do what you need. Injecting flag precedence transformations
// is something of an exception because the caller of this controller (cmd/) is aware of flags, but this
// controller's tree (pkg/) is not.
func NewController(dynamicConfigDir string, transform TransformFunc) *Controller {
return &Controller{
transform: transform,
// channels must have capacity at least 1, since we signal with non-blocking writes
pendingConfigSource: make(chan bool, 1),
configStatus: status.NewNodeConfigStatus(),
checkpointStore: store.NewFsStore(utilfs.DefaultFs{}, filepath.Join(dynamicConfigDir, storeDir)),
}
}
// Bootstrap attempts to return a valid KubeletConfiguration based on the configuration of the Controller,
// or returns an error if no valid configuration could be produced. Bootstrap should be called synchronously before StartSync.
// If the pre-existing local configuration should be used, Bootstrap returns a nil config.
func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) {
utillog.Infof("starting controller")
// ensure the filesystem is initialized
if err := cc.initializeDynamicConfigDir(); err != nil {
return nil, err
}
// determine assigned source and set status
assignedSource, err := cc.checkpointStore.Assigned()
if err != nil {
return nil, err
}
if assignedSource != nil {
cc.configStatus.SetAssigned(assignedSource.NodeConfigSource())
}
// determine last-known-good source and set status
lastKnownGoodSource, err := cc.checkpointStore.LastKnownGood()
if err != nil {
return nil, err
}
if lastKnownGoodSource != nil {
cc.configStatus.SetLastKnownGood(lastKnownGoodSource.NodeConfigSource())
}
// if the assigned source is nil, return nil to indicate local config
if assignedSource == nil {
return nil, nil
}
// attempt to load assigned config
assignedConfig, reason, err := cc.loadConfig(assignedSource)
if err == nil {
// update the active source to the non-nil assigned source
cc.configStatus.SetActive(assignedSource.NodeConfigSource())
// update the last-known-good config if necessary, and start a timer that
// periodically checks whether the last-known good needs to be updated
// we only do this when the assigned config loads and passes validation
// wait.Forever will call the func once before starting the timer
go wait.Forever(func() { cc.checkTrial(configTrialDuration) }, 10*time.Second)
return assignedConfig, nil
} // Assert: the assigned config failed to load or validate
// TODO(mtaufen): consider re-attempting download when a load/verify/parse/validate
// error happens outside trial period, we already made it past the trial so it's probably filesystem corruption
// or something else scary
// log the reason and error details for the failure to load the assigned config
utillog.Errorf(fmt.Sprintf("%s, error: %v", reason, err))
// set status to indicate the failure with the assigned config
cc.configStatus.SetError(reason)
// if the last-known-good source is nil, return nil to indicate local config
if lastKnownGoodSource == nil {
return nil, nil
}
// attempt to load the last-known-good config
lastKnownGoodConfig, _, err := cc.loadConfig(lastKnownGoodSource)
if err != nil {
// we failed to load the last-known-good, so something is really messed up and we just return the error
return nil, err
}
// set status to indicate the active source is the non-nil last-known-good source
cc.configStatus.SetActive(lastKnownGoodSource.NodeConfigSource())
return lastKnownGoodConfig, nil
}
// StartSync tells the controller to start the goroutines that sync status/config to/from the API server.
// The clients must be non-nil, and the nodeName must be non-empty.
func (cc *Controller) StartSync(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) error {
const errFmt = "cannot start Kubelet config sync: %s"
if client == nil {
return fmt.Errorf(errFmt, "nil client")
}
if eventClient == nil {
return fmt.Errorf(errFmt, "nil event client")
}
if nodeName == "" {
return fmt.Errorf(errFmt, "empty nodeName")
}
// Rather than use utilruntime.HandleCrash, which doesn't actually crash in the Kubelet,
// we use HandlePanic to manually call the panic handlers and then crash.
// We have a better chance of recovering normal operation if we just restart the Kubelet in the event
// of a Go runtime error.
// NOTE(mtaufen): utilpanic.HandlePanic returns a function and you have to call it for your thing to run!
// This was EVIL to debug (difficult to see missing `()`).
// The code now uses `go name()` instead of `go utilpanic.HandlePanic(func(){...})()` to avoid confusion.
// status sync worker
statusSyncLoopFunc := utilpanic.HandlePanic(func() {
utillog.Infof("starting status sync loop")
wait.JitterUntil(func() {
cc.configStatus.Sync(client, nodeName)
}, 10*time.Second, 0.2, true, wait.NeverStop)
})
// remote config source informer, if we have a remote source to watch
assignedSource, err := cc.checkpointStore.Assigned()
if err != nil {
return fmt.Errorf(errFmt, err)
} else if assignedSource == nil {
utillog.Infof("local source is assigned, will not start remote config source informer")
} else {
cc.remoteConfigSourceInformer = assignedSource.Informer(client, cache.ResourceEventHandlerFuncs{
AddFunc: cc.onAddRemoteConfigSourceEvent,
UpdateFunc: cc.onUpdateRemoteConfigSourceEvent,
DeleteFunc: cc.onDeleteRemoteConfigSourceEvent,
},
)
}
remoteConfigSourceInformerFunc := utilpanic.HandlePanic(func() {
if cc.remoteConfigSourceInformer != nil {
utillog.Infof("starting remote config source informer")
cc.remoteConfigSourceInformer.Run(wait.NeverStop)
}
})
// node informer
cc.nodeInformer = newSharedNodeInformer(client, nodeName,
cc.onAddNodeEvent, cc.onUpdateNodeEvent, cc.onDeleteNodeEvent)
nodeInformerFunc := utilpanic.HandlePanic(func() {
utillog.Infof("starting Node informer")
cc.nodeInformer.Run(wait.NeverStop)
})
// config sync worker
configSyncLoopFunc := utilpanic.HandlePanic(func() {
utillog.Infof("starting Kubelet config sync loop")
wait.JitterUntil(func() {
cc.syncConfigSource(client, eventClient, nodeName)
}, 10*time.Second, 0.2, true, wait.NeverStop)
})
go statusSyncLoopFunc()
go remoteConfigSourceInformerFunc()
go nodeInformerFunc()
go configSyncLoopFunc()
return nil
}
// loadConfig loads Kubelet config from a checkpoint
// It returns the loaded configuration or a clean failure reason (for status reporting) and an error.
func (cc *Controller) loadConfig(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, string, error) {
// load KubeletConfiguration from checkpoint
kc, err := cc.checkpointStore.Load(source)
if err != nil {
return nil, status.LoadError, err
}
// apply any required transformations to the KubeletConfiguration
if cc.transform != nil {
if err := cc.transform(kc); err != nil {
return nil, status.InternalError, err
}
}
// validate the result
if err := validation.ValidateKubeletConfiguration(kc); err != nil {
return nil, status.ValidateError, err
}
return kc, "", nil
}
// initializeDynamicConfigDir makes sure that the storage layers for various controller components are set up correctly
func (cc *Controller) initializeDynamicConfigDir() error {
utillog.Infof("ensuring filesystem is set up correctly")
// initializeDynamicConfigDir local checkpoint storage location
return cc.checkpointStore.Initialize()
}
// checkTrial checks whether the trial duration has passed, and updates the last-known-good config if necessary
func (cc *Controller) checkTrial(duration time.Duration) {
// when the trial period is over, the assigned config becomes the last-known-good
if trial, err := cc.inTrial(duration); err != nil {
utillog.Errorf("failed to check trial period for assigned config, error: %v", err)
} else if !trial {
if err := cc.graduateAssignedToLastKnownGood(); err != nil {
utillog.Errorf("failed to set last-known-good to assigned config, error: %v", err)
}
}
}
// inTrial returns true if the time elapsed since the last modification of the assigned config does not exceed `trialDur`, false otherwise
func (cc *Controller) inTrial(trialDur time.Duration) (bool, error) {
now := time.Now()
t, err := cc.checkpointStore.AssignedModified()
if err != nil {
return false, err
}
if now.Sub(t) <= trialDur {
return true, nil
}
return false, nil
}
// graduateAssignedToLastKnownGood sets the last-known-good in the checkpointStore
// to the same value as the assigned config maintained by the checkpointStore
func (cc *Controller) graduateAssignedToLastKnownGood() error {
// get assigned
assigned, err := cc.checkpointStore.Assigned()
if err != nil {
return err
}
// get last-known-good
lastKnownGood, err := cc.checkpointStore.LastKnownGood()
if err != nil {
return err
}
// if the sources are equal, no need to change
if assigned == lastKnownGood ||
assigned != nil && lastKnownGood != nil && apiequality.Semantic.DeepEqual(assigned.NodeConfigSource(), lastKnownGood.NodeConfigSource()) {
return nil
}
// update last-known-good
err = cc.checkpointStore.SetLastKnownGood(assigned)
if err != nil {
return err
}
// update the status to reflect the new last-known-good config
cc.configStatus.SetLastKnownGood(assigned.NodeConfigSource())
utillog.Infof("updated last-known-good config to %s, UID: %s, ResourceVersion: %s", assigned.APIPath(), assigned.UID(), assigned.ResourceVersion())
return nil
}