| /* |
| Copyright 2015 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| // Package app makes it easy to create a kubelet server for various contexts. |
| package app |
| |
| import ( |
| "context" |
| "crypto/tls" |
| "errors" |
| "fmt" |
| "math/rand" |
| "net" |
| "net/http" |
| _ "net/http/pprof" |
| "net/url" |
| "os" |
| "path" |
| "path/filepath" |
| "strconv" |
| "time" |
| |
| "github.com/coreos/go-systemd/daemon" |
| "github.com/spf13/cobra" |
| "github.com/spf13/pflag" |
| "k8s.io/klog" |
| |
| "k8s.io/api/core/v1" |
| "k8s.io/apimachinery/pkg/api/resource" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/types" |
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| "k8s.io/apimachinery/pkg/util/sets" |
| "k8s.io/apimachinery/pkg/util/wait" |
| "k8s.io/apiserver/pkg/server/healthz" |
| utilfeature "k8s.io/apiserver/pkg/util/feature" |
| "k8s.io/apiserver/pkg/util/flag" |
| "k8s.io/client-go/dynamic" |
| clientset "k8s.io/client-go/kubernetes" |
| v1core "k8s.io/client-go/kubernetes/typed/core/v1" |
| restclient "k8s.io/client-go/rest" |
| "k8s.io/client-go/tools/clientcmd" |
| "k8s.io/client-go/tools/record" |
| certutil "k8s.io/client-go/util/cert" |
| "k8s.io/client-go/util/certificate" |
| cloudprovider "k8s.io/cloud-provider" |
| csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned" |
| kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" |
| "k8s.io/kubernetes/cmd/kubelet/app/options" |
| "k8s.io/kubernetes/pkg/api/legacyscheme" |
| api "k8s.io/kubernetes/pkg/apis/core" |
| "k8s.io/kubernetes/pkg/capabilities" |
| "k8s.io/kubernetes/pkg/credentialprovider" |
| "k8s.io/kubernetes/pkg/features" |
| "k8s.io/kubernetes/pkg/kubelet" |
| kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" |
| kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme" |
| kubeletconfigvalidation "k8s.io/kubernetes/pkg/kubelet/apis/config/validation" |
| "k8s.io/kubernetes/pkg/kubelet/cadvisor" |
| kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate" |
| "k8s.io/kubernetes/pkg/kubelet/certificate/bootstrap" |
| "k8s.io/kubernetes/pkg/kubelet/cm" |
| "k8s.io/kubernetes/pkg/kubelet/config" |
| kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" |
| "k8s.io/kubernetes/pkg/kubelet/dockershim" |
| dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote" |
| "k8s.io/kubernetes/pkg/kubelet/eviction" |
| evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" |
| dynamickubeletconfig "k8s.io/kubernetes/pkg/kubelet/kubeletconfig" |
| "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles" |
| "k8s.io/kubernetes/pkg/kubelet/server" |
| "k8s.io/kubernetes/pkg/kubelet/server/streaming" |
| kubetypes "k8s.io/kubernetes/pkg/kubelet/types" |
| "k8s.io/kubernetes/pkg/util/configz" |
| utilfs "k8s.io/kubernetes/pkg/util/filesystem" |
| utilflag "k8s.io/kubernetes/pkg/util/flag" |
| "k8s.io/kubernetes/pkg/util/flock" |
| "k8s.io/kubernetes/pkg/util/mount" |
| nodeutil "k8s.io/kubernetes/pkg/util/node" |
| "k8s.io/kubernetes/pkg/util/nsenter" |
| "k8s.io/kubernetes/pkg/util/oom" |
| "k8s.io/kubernetes/pkg/util/rlimit" |
| "k8s.io/kubernetes/pkg/version" |
| "k8s.io/kubernetes/pkg/version/verflag" |
| "k8s.io/utils/exec" |
| ) |
| |
| const ( |
| // Kubelet component name |
| componentKubelet = "kubelet" |
| ) |
| |
| // NewKubeletCommand creates a *cobra.Command object with default parameters |
| func NewKubeletCommand(stopCh <-chan struct{}) *cobra.Command { |
| cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError) |
| cleanFlagSet.SetNormalizeFunc(flag.WordSepNormalizeFunc) |
| kubeletFlags := options.NewKubeletFlags() |
| kubeletConfig, err := options.NewKubeletConfiguration() |
| // programmer error |
| if err != nil { |
| klog.Fatal(err) |
| } |
| |
| cmd := &cobra.Command{ |
| Use: componentKubelet, |
| Long: `The kubelet is the primary "node agent" that runs on each |
| node. The kubelet works in terms of a PodSpec. A PodSpec is a YAML or JSON object |
| that describes a pod. The kubelet takes a set of PodSpecs that are provided through |
| various mechanisms (primarily through the apiserver) and ensures that the containers |
| described in those PodSpecs are running and healthy. The kubelet doesn't manage |
| containers which were not created by Kubernetes. |
| |
| Other than from an PodSpec from the apiserver, there are three ways that a container |
| manifest can be provided to the Kubelet. |
| |
| File: Path passed as a flag on the command line. Files under this path will be monitored |
| periodically for updates. The monitoring period is 20s by default and is configurable |
| via a flag. |
| |
| HTTP endpoint: HTTP endpoint passed as a parameter on the command line. This endpoint |
| is checked every 20 seconds (also configurable with a flag). |
| |
| HTTP server: The kubelet can also listen for HTTP and respond to a simple API |
| (underspec'd currently) to submit a new manifest.`, |
| // The Kubelet has special flag parsing requirements to enforce flag precedence rules, |
| // so we do all our parsing manually in Run, below. |
| // DisableFlagParsing=true provides the full set of flags passed to the kubelet in the |
| // `args` arg to Run, without Cobra's interference. |
| DisableFlagParsing: true, |
| Run: func(cmd *cobra.Command, args []string) { |
| // initial flag parse, since we disable cobra's flag parsing |
| if err := cleanFlagSet.Parse(args); err != nil { |
| cmd.Usage() |
| klog.Fatal(err) |
| } |
| |
| // check if there are non-flag arguments in the command line |
| cmds := cleanFlagSet.Args() |
| if len(cmds) > 0 { |
| cmd.Usage() |
| klog.Fatalf("unknown command: %s", cmds[0]) |
| } |
| |
| // short-circuit on help |
| help, err := cleanFlagSet.GetBool("help") |
| if err != nil { |
| klog.Fatal(`"help" flag is non-bool, programmer error, please correct`) |
| } |
| if help { |
| cmd.Help() |
| return |
| } |
| |
| // short-circuit on verflag |
| verflag.PrintAndExitIfRequested() |
| utilflag.PrintFlags(cleanFlagSet) |
| |
| // set feature gates from initial flags-based config |
| if err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil { |
| klog.Fatal(err) |
| } |
| |
| // validate the initial KubeletFlags |
| if err := options.ValidateKubeletFlags(kubeletFlags); err != nil { |
| klog.Fatal(err) |
| } |
| |
| if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") { |
| klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead") |
| } |
| |
| // load kubelet config file, if provided |
| if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 { |
| kubeletConfig, err = loadConfigFile(configFile) |
| if err != nil { |
| klog.Fatal(err) |
| } |
| // We must enforce flag precedence by re-parsing the command line into the new object. |
| // This is necessary to preserve backwards-compatibility across binary upgrades. |
| // See issue #56171 for more details. |
| if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil { |
| klog.Fatal(err) |
| } |
| // update feature gates based on new config |
| if err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil { |
| klog.Fatal(err) |
| } |
| } |
| |
| // We always validate the local configuration (command line + config file). |
| // This is the default "last-known-good" config for dynamic config, and must always remain valid. |
| if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil { |
| klog.Fatal(err) |
| } |
| |
| // use dynamic kubelet config, if enabled |
| var kubeletConfigController *dynamickubeletconfig.Controller |
| if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 { |
| var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration |
| dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir, |
| func(kc *kubeletconfiginternal.KubeletConfiguration) error { |
| // Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence, |
| // so that we get a complete validation at the same point where we can decide to reject dynamic config. |
| // This fixes the flag-precedence component of issue #63305. |
| // See issue #56171 for general details on flag precedence. |
| return kubeletConfigFlagPrecedence(kc, args) |
| }) |
| if err != nil { |
| klog.Fatal(err) |
| } |
| // If we should just use our existing, local config, the controller will return a nil config |
| if dynamicKubeletConfig != nil { |
| kubeletConfig = dynamicKubeletConfig |
| // Note: flag precedence was already enforced in the controller, prior to validation, |
| // by our above transform function. Now we simply update feature gates from the new config. |
| if err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil { |
| klog.Fatal(err) |
| } |
| } |
| } |
| |
| // construct a KubeletServer from kubeletFlags and kubeletConfig |
| kubeletServer := &options.KubeletServer{ |
| KubeletFlags: *kubeletFlags, |
| KubeletConfiguration: *kubeletConfig, |
| } |
| |
| // use kubeletServer to construct the default KubeletDeps |
| kubeletDeps, err := UnsecuredDependencies(kubeletServer) |
| if err != nil { |
| klog.Fatal(err) |
| } |
| |
| // add the kubelet config controller to kubeletDeps |
| kubeletDeps.KubeletConfigController = kubeletConfigController |
| |
| // start the experimental docker shim, if enabled |
| if kubeletServer.KubeletFlags.ExperimentalDockershim { |
| if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil { |
| klog.Fatal(err) |
| } |
| return |
| } |
| |
| // run the kubelet |
| klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration) |
| if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil { |
| klog.Fatal(err) |
| } |
| }, |
| } |
| |
| // keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flags |
| kubeletFlags.AddFlags(cleanFlagSet) |
| options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig) |
| options.AddGlobalFlags(cleanFlagSet) |
| cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name())) |
| |
| // ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flags |
| const usageFmt = "Usage:\n %s\n\nFlags:\n%s" |
| cmd.SetUsageFunc(func(cmd *cobra.Command) error { |
| fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2)) |
| return nil |
| }) |
| cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) { |
| fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2)) |
| }) |
| |
| return cmd |
| } |
| |
| // newFlagSetWithGlobals constructs a new pflag.FlagSet with global flags registered |
| // on it. |
| func newFlagSetWithGlobals() *pflag.FlagSet { |
| fs := pflag.NewFlagSet("", pflag.ExitOnError) |
| // set the normalize func, similar to k8s.io/apiserver/pkg/util/flag/flags.go:InitFlags |
| fs.SetNormalizeFunc(flag.WordSepNormalizeFunc) |
| // explicitly add flags from libs that register global flags |
| options.AddGlobalFlags(fs) |
| return fs |
| } |
| |
| // newFakeFlagSet constructs a pflag.FlagSet with the same flags as fs, but where |
| // all values have noop Set implementations |
| func newFakeFlagSet(fs *pflag.FlagSet) *pflag.FlagSet { |
| ret := pflag.NewFlagSet("", pflag.ExitOnError) |
| ret.SetNormalizeFunc(fs.GetNormalizeFunc()) |
| fs.VisitAll(func(f *pflag.Flag) { |
| ret.VarP(flag.NoOp{}, f.Name, f.Shorthand, f.Usage) |
| }) |
| return ret |
| } |
| |
| // kubeletConfigFlagPrecedence re-parses flags over the KubeletConfiguration object. |
| // We must enforce flag precedence by re-parsing the command line into the new object. |
| // This is necessary to preserve backwards-compatibility across binary upgrades. |
| // See issue #56171 for more details. |
| func kubeletConfigFlagPrecedence(kc *kubeletconfiginternal.KubeletConfiguration, args []string) error { |
| // We use a throwaway kubeletFlags and a fake global flagset to avoid double-parses, |
| // as some Set implementations accumulate values from multiple flag invocations. |
| fs := newFakeFlagSet(newFlagSetWithGlobals()) |
| // register throwaway KubeletFlags |
| options.NewKubeletFlags().AddFlags(fs) |
| // register new KubeletConfiguration |
| options.AddKubeletConfigFlags(fs, kc) |
| // Remember original feature gates, so we can merge with flag gates later |
| original := kc.FeatureGates |
| // re-parse flags |
| if err := fs.Parse(args); err != nil { |
| return err |
| } |
| // Add back feature gates that were set in the original kc, but not in flags |
| for k, v := range original { |
| if _, ok := kc.FeatureGates[k]; !ok { |
| kc.FeatureGates[k] = v |
| } |
| } |
| return nil |
| } |
| |
| func loadConfigFile(name string) (*kubeletconfiginternal.KubeletConfiguration, error) { |
| const errFmt = "failed to load Kubelet config file %s, error %v" |
| // compute absolute path based on current working dir |
| kubeletConfigFile, err := filepath.Abs(name) |
| if err != nil { |
| return nil, fmt.Errorf(errFmt, name, err) |
| } |
| loader, err := configfiles.NewFsLoader(utilfs.DefaultFs{}, kubeletConfigFile) |
| if err != nil { |
| return nil, fmt.Errorf(errFmt, name, err) |
| } |
| kc, err := loader.Load() |
| if err != nil { |
| return nil, fmt.Errorf(errFmt, name, err) |
| } |
| return kc, err |
| } |
| |
| // UnsecuredDependencies returns a Dependencies suitable for being run, or an error if the server setup |
| // is not valid. It will not start any background processes, and does not include authentication/authorization |
| func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, error) { |
| // Initialize the TLS Options |
| tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration) |
| if err != nil { |
| return nil, err |
| } |
| |
| mounter := mount.New(s.ExperimentalMounterPath) |
| var pluginRunner = exec.New() |
| if s.Containerized { |
| klog.V(2).Info("Running kubelet in containerized mode") |
| ne, err := nsenter.NewNsenter(nsenter.DefaultHostRootFsPath, exec.New()) |
| if err != nil { |
| return nil, err |
| } |
| mounter = mount.NewNsenterMounter(s.RootDirectory, ne) |
| // an exec interface which can use nsenter for flex plugin calls |
| pluginRunner = nsenter.NewNsenterExecutor(nsenter.DefaultHostRootFsPath, exec.New()) |
| } |
| |
| var dockerClientConfig *dockershim.ClientConfig |
| if s.ContainerRuntime == kubetypes.DockerContainerRuntime { |
| dockerClientConfig = &dockershim.ClientConfig{ |
| DockerEndpoint: s.DockerEndpoint, |
| RuntimeRequestTimeout: s.RuntimeRequestTimeout.Duration, |
| ImagePullProgressDeadline: s.ImagePullProgressDeadline.Duration, |
| } |
| } |
| |
| return &kubelet.Dependencies{ |
| Auth: nil, // default does not enforce auth[nz] |
| CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here |
| Cloud: nil, // cloud provider might start background processes |
| ContainerManager: nil, |
| DockerClientConfig: dockerClientConfig, |
| KubeClient: nil, |
| HeartbeatClient: nil, |
| CSIClient: nil, |
| EventClient: nil, |
| Mounter: mounter, |
| OOMAdjuster: oom.NewOOMAdjuster(), |
| OSInterface: kubecontainer.RealOS{}, |
| VolumePlugins: ProbeVolumePlugins(), |
| DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir, pluginRunner), |
| TLSOptions: tlsOptions}, nil |
| } |
| |
| // Run runs the specified KubeletServer with the given Dependencies. This should never exit. |
| // The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer. |
| // Otherwise, the caller is assumed to have set up the Dependencies object and a default one will |
| // not be generated. |
| func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) error { |
| // To help debugging, immediately log version |
| klog.Infof("Version: %+v", version.Get()) |
| if err := initForOS(s.KubeletFlags.WindowsService); err != nil { |
| return fmt.Errorf("failed OS init: %v", err) |
| } |
| if err := run(s, kubeDeps, stopCh); err != nil { |
| return fmt.Errorf("failed to run Kubelet: %v", err) |
| } |
| return nil |
| } |
| |
| func checkPermissions() error { |
| if uid := os.Getuid(); uid != 0 { |
| return fmt.Errorf("Kubelet needs to run as uid `0`. It is being run as %d", uid) |
| } |
| // TODO: Check if kubelet is running in the `initial` user namespace. |
| // http://man7.org/linux/man-pages/man7/user_namespaces.7.html |
| return nil |
| } |
| |
| func setConfigz(cz *configz.Config, kc *kubeletconfiginternal.KubeletConfiguration) error { |
| scheme, _, err := kubeletscheme.NewSchemeAndCodecs() |
| if err != nil { |
| return err |
| } |
| versioned := kubeletconfigv1beta1.KubeletConfiguration{} |
| if err := scheme.Convert(kc, &versioned, nil); err != nil { |
| return err |
| } |
| cz.Set(versioned) |
| return nil |
| } |
| |
| func initConfigz(kc *kubeletconfiginternal.KubeletConfiguration) error { |
| cz, err := configz.New("kubeletconfig") |
| if err != nil { |
| klog.Errorf("unable to register configz: %s", err) |
| return err |
| } |
| if err := setConfigz(cz, kc); err != nil { |
| klog.Errorf("unable to register config: %s", err) |
| return err |
| } |
| return nil |
| } |
| |
| // makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise. |
| func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) { |
| if kubeDeps.Recorder != nil { |
| return |
| } |
| eventBroadcaster := record.NewBroadcaster() |
| kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)}) |
| eventBroadcaster.StartLogging(klog.V(3).Infof) |
| if kubeDeps.EventClient != nil { |
| klog.V(4).Infof("Sending events to api server.") |
| eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")}) |
| } else { |
| klog.Warning("No api server defined - no events will be sent to API server.") |
| } |
| } |
| |
| func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) { |
| // Set global feature gates based on the value on the initial KubeletServer |
| err = utilfeature.DefaultFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates) |
| if err != nil { |
| return err |
| } |
| // validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates) |
| if err := options.ValidateKubeletServer(s); err != nil { |
| return err |
| } |
| |
| // Obtain Kubelet Lock File |
| if s.ExitOnLockContention && s.LockFilePath == "" { |
| return errors.New("cannot exit on lock file contention: no lock file specified") |
| } |
| done := make(chan struct{}) |
| if s.LockFilePath != "" { |
| klog.Infof("acquiring file lock on %q", s.LockFilePath) |
| if err := flock.Acquire(s.LockFilePath); err != nil { |
| return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err) |
| } |
| if s.ExitOnLockContention { |
| klog.Infof("watching for inotify events for: %v", s.LockFilePath) |
| if err := watchForLockfileContention(s.LockFilePath, done); err != nil { |
| return err |
| } |
| } |
| } |
| |
| // Register current configuration with /configz endpoint |
| err = initConfigz(&s.KubeletConfiguration) |
| if err != nil { |
| klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err) |
| } |
| |
| // About to get clients and such, detect standaloneMode |
| standaloneMode := true |
| if len(s.KubeConfig) > 0 { |
| standaloneMode = false |
| } |
| |
| if kubeDeps == nil { |
| kubeDeps, err = UnsecuredDependencies(s) |
| if err != nil { |
| return err |
| } |
| } |
| |
| if kubeDeps.Cloud == nil { |
| if !cloudprovider.IsExternal(s.CloudProvider) { |
| cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) |
| if err != nil { |
| return err |
| } |
| if cloud == nil { |
| klog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile) |
| } else { |
| klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile) |
| } |
| kubeDeps.Cloud = cloud |
| } |
| } |
| |
| hostName, err := nodeutil.GetHostname(s.HostnameOverride) |
| if err != nil { |
| return err |
| } |
| nodeName, err := getNodeName(kubeDeps.Cloud, hostName) |
| if err != nil { |
| return err |
| } |
| |
| if s.BootstrapKubeconfig != "" { |
| if err := bootstrap.LoadClientCert(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil { |
| return err |
| } |
| } |
| |
| // if in standalone mode, indicate as much by setting all clients to nil |
| if standaloneMode { |
| kubeDeps.KubeClient = nil |
| kubeDeps.DynamicKubeClient = nil |
| kubeDeps.EventClient = nil |
| kubeDeps.HeartbeatClient = nil |
| klog.Warningf("standalone mode, no API client") |
| } else if kubeDeps.KubeClient == nil || kubeDeps.EventClient == nil || kubeDeps.HeartbeatClient == nil || kubeDeps.DynamicKubeClient == nil { |
| // initialize clients if not standalone mode and any of the clients are not provided |
| var kubeClient clientset.Interface |
| var eventClient v1core.EventsGetter |
| var heartbeatClient clientset.Interface |
| var dynamicKubeClient dynamic.Interface |
| |
| clientConfig, err := createAPIServerClientConfig(s) |
| if err != nil { |
| return fmt.Errorf("invalid kubeconfig: %v", err) |
| } |
| |
| var clientCertificateManager certificate.Manager |
| if s.RotateCertificates && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletClientCertificate) { |
| clientCertificateManager, err = kubeletcertificate.NewKubeletClientCertificateManager(s.CertDirectory, nodeName, clientConfig.CertData, clientConfig.KeyData, clientConfig.CertFile, clientConfig.KeyFile) |
| if err != nil { |
| return err |
| } |
| } |
| // we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable |
| // to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper |
| // or the bootstrapping credentials to potentially lay down new initial config. |
| closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute) |
| if err != nil { |
| return err |
| } |
| |
| kubeClient, err = clientset.NewForConfig(clientConfig) |
| if err != nil { |
| klog.Warningf("New kubeClient from clientConfig error: %v", err) |
| } else if kubeClient.CertificatesV1beta1() != nil && clientCertificateManager != nil { |
| klog.V(2).Info("Starting client certificate rotation.") |
| clientCertificateManager.SetCertificateSigningRequestClient(kubeClient.CertificatesV1beta1().CertificateSigningRequests()) |
| clientCertificateManager.Start() |
| } |
| dynamicKubeClient, err = dynamic.NewForConfig(clientConfig) |
| if err != nil { |
| klog.Warningf("Failed to initialize dynamic KubeClient: %v", err) |
| } |
| |
| // make a separate client for events |
| eventClientConfig := *clientConfig |
| eventClientConfig.QPS = float32(s.EventRecordQPS) |
| eventClientConfig.Burst = int(s.EventBurst) |
| eventClient, err = v1core.NewForConfig(&eventClientConfig) |
| if err != nil { |
| klog.Warningf("Failed to create API Server client for Events: %v", err) |
| } |
| |
| // make a separate client for heartbeat with throttling disabled and a timeout attached |
| heartbeatClientConfig := *clientConfig |
| heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration |
| // if the NodeLease feature is enabled, the timeout is the minimum of the lease duration and status update frequency |
| if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) { |
| leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second |
| if heartbeatClientConfig.Timeout > leaseTimeout { |
| heartbeatClientConfig.Timeout = leaseTimeout |
| } |
| } |
| heartbeatClientConfig.QPS = float32(-1) |
| heartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig) |
| if err != nil { |
| klog.Warningf("Failed to create API Server client for heartbeat: %v", err) |
| } |
| |
| // csiClient works with CRDs that support json only |
| clientConfig.ContentType = "application/json" |
| csiClient, err := csiclientset.NewForConfig(clientConfig) |
| if err != nil { |
| klog.Warningf("Failed to create CSI API client: %v", err) |
| } |
| |
| kubeDeps.KubeClient = kubeClient |
| kubeDeps.DynamicKubeClient = dynamicKubeClient |
| if heartbeatClient != nil { |
| kubeDeps.HeartbeatClient = heartbeatClient |
| kubeDeps.OnHeartbeatFailure = closeAllConns |
| } |
| if eventClient != nil { |
| kubeDeps.EventClient = eventClient |
| } |
| kubeDeps.CSIClient = csiClient |
| } |
| |
| // If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops |
| if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 && |
| kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce { |
| if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil { |
| return err |
| } |
| } |
| |
| if kubeDeps.Auth == nil { |
| auth, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration) |
| if err != nil { |
| return err |
| } |
| kubeDeps.Auth = auth |
| } |
| |
| if kubeDeps.CAdvisorInterface == nil { |
| imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint) |
| kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint)) |
| if err != nil { |
| return err |
| } |
| } |
| |
| // Setup event recorder if required. |
| makeEventRecorder(kubeDeps, nodeName) |
| |
| if kubeDeps.ContainerManager == nil { |
| if s.CgroupsPerQOS && s.CgroupRoot == "" { |
| klog.Infof("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /") |
| s.CgroupRoot = "/" |
| } |
| kubeReserved, err := parseResourceList(s.KubeReserved) |
| if err != nil { |
| return err |
| } |
| systemReserved, err := parseResourceList(s.SystemReserved) |
| if err != nil { |
| return err |
| } |
| var hardEvictionThresholds []evictionapi.Threshold |
| // If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here. |
| if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold { |
| hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil) |
| if err != nil { |
| return err |
| } |
| } |
| experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved) |
| if err != nil { |
| return err |
| } |
| |
| devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins) |
| |
| kubeDeps.ContainerManager, err = cm.NewContainerManager( |
| kubeDeps.Mounter, |
| kubeDeps.CAdvisorInterface, |
| cm.NodeConfig{ |
| RuntimeCgroupsName: s.RuntimeCgroups, |
| SystemCgroupsName: s.SystemCgroups, |
| KubeletCgroupsName: s.KubeletCgroups, |
| ContainerRuntime: s.ContainerRuntime, |
| CgroupsPerQOS: s.CgroupsPerQOS, |
| CgroupRoot: s.CgroupRoot, |
| CgroupDriver: s.CgroupDriver, |
| KubeletRootDir: s.RootDirectory, |
| ProtectKernelDefaults: s.ProtectKernelDefaults, |
| NodeAllocatableConfig: cm.NodeAllocatableConfig{ |
| KubeReservedCgroupName: s.KubeReservedCgroup, |
| SystemReservedCgroupName: s.SystemReservedCgroup, |
| EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...), |
| KubeReserved: kubeReserved, |
| SystemReserved: systemReserved, |
| HardEvictionThresholds: hardEvictionThresholds, |
| }, |
| QOSReserved: *experimentalQOSReserved, |
| ExperimentalCPUManagerPolicy: s.CPUManagerPolicy, |
| ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration, |
| ExperimentalPodPidsLimit: s.PodPidsLimit, |
| EnforceCPULimits: s.CPUCFSQuota, |
| CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration, |
| }, |
| s.FailSwapOn, |
| devicePluginEnabled, |
| kubeDeps.Recorder) |
| |
| if err != nil { |
| return err |
| } |
| } |
| |
| if err := checkPermissions(); err != nil { |
| klog.Error(err) |
| } |
| |
| utilruntime.ReallyCrash = s.ReallyCrashForTesting |
| |
| rand.Seed(time.Now().UnixNano()) |
| |
| // TODO(vmarmol): Do this through container config. |
| oomAdjuster := kubeDeps.OOMAdjuster |
| if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil { |
| klog.Warning(err) |
| } |
| |
| if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil { |
| return err |
| } |
| |
| if s.HealthzPort > 0 { |
| healthz.DefaultHealthz() |
| go wait.Until(func() { |
| err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil) |
| if err != nil { |
| klog.Errorf("Starting health server failed: %v", err) |
| } |
| }, 5*time.Second, wait.NeverStop) |
| } |
| |
| if s.RunOnce { |
| return nil |
| } |
| |
| // If systemd is used, notify it that we have started |
| go daemon.SdNotify(false, "READY=1") |
| |
| select { |
| case <-done: |
| break |
| case <-stopCh: |
| break |
| } |
| |
| return nil |
| } |
| |
| // getNodeName returns the node name according to the cloud provider |
| // if cloud provider is specified. Otherwise, returns the hostname of the node. |
| func getNodeName(cloud cloudprovider.Interface, hostname string) (types.NodeName, error) { |
| if cloud == nil { |
| return types.NodeName(hostname), nil |
| } |
| |
| instances, ok := cloud.Instances() |
| if !ok { |
| return "", fmt.Errorf("failed to get instances from cloud provider") |
| } |
| |
| nodeName, err := instances.CurrentNodeName(context.TODO(), hostname) |
| if err != nil { |
| return "", fmt.Errorf("error fetching current node name from cloud provider: %v", err) |
| } |
| |
| klog.V(2).Infof("cloud provider determined current node name to be %s", nodeName) |
| |
| return nodeName, nil |
| } |
| |
| // InitializeTLS checks for a configured TLSCertFile and TLSPrivateKeyFile: if unspecified a new self-signed |
| // certificate and key file are generated. Returns a configured server.TLSOptions object. |
| func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletConfiguration) (*server.TLSOptions, error) { |
| if !kc.ServerTLSBootstrap && kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" { |
| kc.TLSCertFile = path.Join(kf.CertDirectory, "kubelet.crt") |
| kc.TLSPrivateKeyFile = path.Join(kf.CertDirectory, "kubelet.key") |
| |
| canReadCertAndKey, err := certutil.CanReadCertAndKey(kc.TLSCertFile, kc.TLSPrivateKeyFile) |
| if err != nil { |
| return nil, err |
| } |
| if !canReadCertAndKey { |
| hostName, err := nodeutil.GetHostname(kf.HostnameOverride) |
| if err != nil { |
| return nil, err |
| } |
| cert, key, err := certutil.GenerateSelfSignedCertKey(hostName, nil, nil) |
| if err != nil { |
| return nil, fmt.Errorf("unable to generate self signed cert: %v", err) |
| } |
| |
| if err := certutil.WriteCert(kc.TLSCertFile, cert); err != nil { |
| return nil, err |
| } |
| |
| if err := certutil.WriteKey(kc.TLSPrivateKeyFile, key); err != nil { |
| return nil, err |
| } |
| |
| klog.V(4).Infof("Using self-signed cert (%s, %s)", kc.TLSCertFile, kc.TLSPrivateKeyFile) |
| } |
| } |
| |
| tlsCipherSuites, err := flag.TLSCipherSuites(kc.TLSCipherSuites) |
| if err != nil { |
| return nil, err |
| } |
| |
| minTLSVersion, err := flag.TLSVersion(kc.TLSMinVersion) |
| if err != nil { |
| return nil, err |
| } |
| |
| tlsOptions := &server.TLSOptions{ |
| Config: &tls.Config{ |
| MinVersion: minTLSVersion, |
| CipherSuites: tlsCipherSuites, |
| }, |
| CertFile: kc.TLSCertFile, |
| KeyFile: kc.TLSPrivateKeyFile, |
| } |
| |
| if len(kc.Authentication.X509.ClientCAFile) > 0 { |
| clientCAs, err := certutil.NewPool(kc.Authentication.X509.ClientCAFile) |
| if err != nil { |
| return nil, fmt.Errorf("unable to load client CA file %s: %v", kc.Authentication.X509.ClientCAFile, err) |
| } |
| // Specify allowed CAs for client certificates |
| tlsOptions.Config.ClientCAs = clientCAs |
| // Populate PeerCertificates in requests, but don't reject connections without verified certificates |
| tlsOptions.Config.ClientAuth = tls.RequestClientCert |
| } |
| |
| return tlsOptions, nil |
| } |
| |
| func kubeconfigClientConfig(s *options.KubeletServer) (*restclient.Config, error) { |
| return clientcmd.NewNonInteractiveDeferredLoadingClientConfig( |
| &clientcmd.ClientConfigLoadingRules{ExplicitPath: s.KubeConfig}, |
| &clientcmd.ConfigOverrides{}, |
| ).ClientConfig() |
| } |
| |
| // createClientConfig creates a client configuration from the command line arguments. |
| // If --kubeconfig is explicitly set, it will be used. |
| func createClientConfig(s *options.KubeletServer) (*restclient.Config, error) { |
| if s.BootstrapKubeconfig != "" || len(s.KubeConfig) > 0 { |
| return kubeconfigClientConfig(s) |
| } else { |
| return nil, fmt.Errorf("createClientConfig called in standalone mode") |
| } |
| } |
| |
| // createAPIServerClientConfig generates a client.Config from command line flags |
| // via createClientConfig and then injects chaos into the configuration via addChaosToClientConfig. |
| func createAPIServerClientConfig(s *options.KubeletServer) (*restclient.Config, error) { |
| clientConfig, err := createClientConfig(s) |
| if err != nil { |
| return nil, err |
| } |
| |
| clientConfig.ContentType = s.ContentType |
| // Override kubeconfig qps/burst settings from flags |
| clientConfig.QPS = float32(s.KubeAPIQPS) |
| clientConfig.Burst = int(s.KubeAPIBurst) |
| |
| return clientConfig, nil |
| } |
| |
| // RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications: |
| // 1 Integration tests |
| // 2 Kubelet binary |
| // 3 Standalone 'kubernetes' binary |
| // Eventually, #2 will be replaced with instances of #3 |
| func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error { |
| hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride) |
| if err != nil { |
| return err |
| } |
| // Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil |
| nodeName, err := getNodeName(kubeDeps.Cloud, hostname) |
| if err != nil { |
| return err |
| } |
| // Setup event recorder if required. |
| makeEventRecorder(kubeDeps, nodeName) |
| |
| // TODO(mtaufen): I moved the validation of these fields here, from UnsecuredKubeletConfig, |
| // so that I could remove the associated fields from KubeletConfiginternal. I would |
| // prefer this to be done as part of an independent validation step on the |
| // KubeletConfiguration. But as far as I can tell, we don't have an explicit |
| // place for validation of the KubeletConfiguration yet. |
| hostNetworkSources, err := kubetypes.GetValidatedSources(kubeServer.HostNetworkSources) |
| if err != nil { |
| return err |
| } |
| |
| hostPIDSources, err := kubetypes.GetValidatedSources(kubeServer.HostPIDSources) |
| if err != nil { |
| return err |
| } |
| |
| hostIPCSources, err := kubetypes.GetValidatedSources(kubeServer.HostIPCSources) |
| if err != nil { |
| return err |
| } |
| |
| privilegedSources := capabilities.PrivilegedSources{ |
| HostNetworkSources: hostNetworkSources, |
| HostPIDSources: hostPIDSources, |
| HostIPCSources: hostIPCSources, |
| } |
| capabilities.Setup(kubeServer.AllowPrivileged, privilegedSources, 0) |
| |
| credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory) |
| klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory) |
| |
| if kubeDeps.OSInterface == nil { |
| kubeDeps.OSInterface = kubecontainer.RealOS{} |
| } |
| |
| k, err := CreateAndInitKubelet(&kubeServer.KubeletConfiguration, |
| kubeDeps, |
| &kubeServer.ContainerRuntimeOptions, |
| kubeServer.ContainerRuntime, |
| kubeServer.RuntimeCgroups, |
| kubeServer.HostnameOverride, |
| kubeServer.NodeIP, |
| kubeServer.ProviderID, |
| kubeServer.CloudProvider, |
| kubeServer.CertDirectory, |
| kubeServer.RootDirectory, |
| kubeServer.RegisterNode, |
| kubeServer.RegisterWithTaints, |
| kubeServer.AllowedUnsafeSysctls, |
| kubeServer.RemoteRuntimeEndpoint, |
| kubeServer.RemoteImageEndpoint, |
| kubeServer.ExperimentalMounterPath, |
| kubeServer.ExperimentalKernelMemcgNotification, |
| kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount, |
| kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold, |
| kubeServer.MinimumGCAge, |
| kubeServer.MaxPerPodContainerCount, |
| kubeServer.MaxContainerCount, |
| kubeServer.MasterServiceNamespace, |
| kubeServer.RegisterSchedulable, |
| kubeServer.NonMasqueradeCIDR, |
| kubeServer.KeepTerminatedPodVolumes, |
| kubeServer.NodeLabels, |
| kubeServer.SeccompProfileRoot, |
| kubeServer.BootstrapCheckpointPath, |
| kubeServer.NodeStatusMaxImages) |
| if err != nil { |
| return fmt.Errorf("failed to create kubelet: %v", err) |
| } |
| |
| // NewMainKubelet should have set up a pod source config if one didn't exist |
| // when the builder was run. This is just a precaution. |
| if kubeDeps.PodConfig == nil { |
| return fmt.Errorf("failed to create kubelet, pod source config was nil") |
| } |
| podCfg := kubeDeps.PodConfig |
| |
| rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles)) |
| |
| // process pods and exit. |
| if runOnce { |
| if _, err := k.RunOnce(podCfg.Updates()); err != nil { |
| return fmt.Errorf("runonce failed: %v", err) |
| } |
| klog.Infof("Started kubelet as runonce") |
| } else { |
| startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer) |
| klog.Infof("Started kubelet") |
| } |
| return nil |
| } |
| |
| func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) { |
| // start the kubelet |
| go wait.Until(func() { |
| k.Run(podCfg.Updates()) |
| }, 0, wait.NeverStop) |
| |
| // start the kubelet server |
| if enableServer { |
| go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling) |
| |
| } |
| if kubeCfg.ReadOnlyPort > 0 { |
| go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort)) |
| } |
| if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) { |
| go k.ListenAndServePodResources() |
| } |
| } |
| |
| func CreateAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, |
| kubeDeps *kubelet.Dependencies, |
| crOptions *config.ContainerRuntimeOptions, |
| containerRuntime string, |
| runtimeCgroups string, |
| hostnameOverride string, |
| nodeIP string, |
| providerID string, |
| cloudProvider string, |
| certDirectory string, |
| rootDirectory string, |
| registerNode bool, |
| registerWithTaints []api.Taint, |
| allowedUnsafeSysctls []string, |
| remoteRuntimeEndpoint string, |
| remoteImageEndpoint string, |
| experimentalMounterPath string, |
| experimentalKernelMemcgNotification bool, |
| experimentalCheckNodeCapabilitiesBeforeMount bool, |
| experimentalNodeAllocatableIgnoreEvictionThreshold bool, |
| minimumGCAge metav1.Duration, |
| maxPerPodContainerCount int32, |
| maxContainerCount int32, |
| masterServiceNamespace string, |
| registerSchedulable bool, |
| nonMasqueradeCIDR string, |
| keepTerminatedPodVolumes bool, |
| nodeLabels map[string]string, |
| seccompProfileRoot string, |
| bootstrapCheckpointPath string, |
| nodeStatusMaxImages int32) (k kubelet.Bootstrap, err error) { |
| // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop |
| // up into "per source" synchronizations |
| |
| k, err = kubelet.NewMainKubelet(kubeCfg, |
| kubeDeps, |
| crOptions, |
| containerRuntime, |
| runtimeCgroups, |
| hostnameOverride, |
| nodeIP, |
| providerID, |
| cloudProvider, |
| certDirectory, |
| rootDirectory, |
| registerNode, |
| registerWithTaints, |
| allowedUnsafeSysctls, |
| remoteRuntimeEndpoint, |
| remoteImageEndpoint, |
| experimentalMounterPath, |
| experimentalKernelMemcgNotification, |
| experimentalCheckNodeCapabilitiesBeforeMount, |
| experimentalNodeAllocatableIgnoreEvictionThreshold, |
| minimumGCAge, |
| maxPerPodContainerCount, |
| maxContainerCount, |
| masterServiceNamespace, |
| registerSchedulable, |
| nonMasqueradeCIDR, |
| keepTerminatedPodVolumes, |
| nodeLabels, |
| seccompProfileRoot, |
| bootstrapCheckpointPath, |
| nodeStatusMaxImages) |
| if err != nil { |
| return nil, err |
| } |
| |
| k.BirthCry() |
| |
| k.StartGarbageCollection() |
| |
| return k, nil |
| } |
| |
| // parseResourceList parses the given configuration map into an API |
| // ResourceList or returns an error. |
| func parseResourceList(m map[string]string) (v1.ResourceList, error) { |
| if len(m) == 0 { |
| return nil, nil |
| } |
| rl := make(v1.ResourceList) |
| for k, v := range m { |
| switch v1.ResourceName(k) { |
| // CPU, memory and local storage resources are supported. |
| case v1.ResourceCPU, v1.ResourceMemory, v1.ResourceEphemeralStorage: |
| q, err := resource.ParseQuantity(v) |
| if err != nil { |
| return nil, err |
| } |
| if q.Sign() == -1 { |
| return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v) |
| } |
| rl[v1.ResourceName(k)] = q |
| default: |
| return nil, fmt.Errorf("cannot reserve %q resource", k) |
| } |
| } |
| return rl, nil |
| } |
| |
| // BootstrapKubeletConfigController constructs and bootstrap a configuration controller |
| func BootstrapKubeletConfigController(dynamicConfigDir string, transform dynamickubeletconfig.TransformFunc) (*kubeletconfiginternal.KubeletConfiguration, *dynamickubeletconfig.Controller, error) { |
| if !utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) { |
| return nil, nil, fmt.Errorf("failed to bootstrap Kubelet config controller, you must enable the DynamicKubeletConfig feature gate") |
| } |
| if len(dynamicConfigDir) == 0 { |
| return nil, nil, fmt.Errorf("cannot bootstrap Kubelet config controller, --dynamic-config-dir was not provided") |
| } |
| |
| // compute absolute path and bootstrap controller |
| dir, err := filepath.Abs(dynamicConfigDir) |
| if err != nil { |
| return nil, nil, fmt.Errorf("failed to get absolute path for --dynamic-config-dir=%s", dynamicConfigDir) |
| } |
| // get the latest KubeletConfiguration checkpoint from disk, or return the default config if no valid checkpoints exist |
| c := dynamickubeletconfig.NewController(dir, transform) |
| kc, err := c.Bootstrap() |
| if err != nil { |
| return nil, nil, fmt.Errorf("failed to determine a valid configuration, error: %v", err) |
| } |
| return kc, c, nil |
| } |
| |
| // RunDockershim only starts the dockershim in current process. This is only used for cri validate testing purpose |
| // TODO(random-liu): Move this to a separate binary. |
| func RunDockershim(f *options.KubeletFlags, c *kubeletconfiginternal.KubeletConfiguration, stopCh <-chan struct{}) error { |
| r := &f.ContainerRuntimeOptions |
| |
| // Initialize docker client configuration. |
| dockerClientConfig := &dockershim.ClientConfig{ |
| DockerEndpoint: r.DockerEndpoint, |
| RuntimeRequestTimeout: c.RuntimeRequestTimeout.Duration, |
| ImagePullProgressDeadline: r.ImagePullProgressDeadline.Duration, |
| } |
| |
| // Initialize network plugin settings. |
| pluginSettings := dockershim.NetworkPluginSettings{ |
| HairpinMode: kubeletconfiginternal.HairpinMode(c.HairpinMode), |
| NonMasqueradeCIDR: f.NonMasqueradeCIDR, |
| PluginName: r.NetworkPluginName, |
| PluginConfDir: r.CNIConfDir, |
| PluginBinDirString: r.CNIBinDir, |
| MTU: int(r.NetworkPluginMTU), |
| } |
| |
| // Initialize streaming configuration. (Not using TLS now) |
| streamingConfig := &streaming.Config{ |
| // Use a relative redirect (no scheme or host). |
| BaseURL: &url.URL{Path: "/cri/"}, |
| StreamIdleTimeout: c.StreamingConnectionIdleTimeout.Duration, |
| StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout, |
| SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols, |
| SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols, |
| } |
| |
| // Standalone dockershim will always start the local streaming server. |
| ds, err := dockershim.NewDockerService(dockerClientConfig, r.PodSandboxImage, streamingConfig, &pluginSettings, |
| f.RuntimeCgroups, c.CgroupDriver, r.DockershimRootDirectory, true /*startLocalStreamingServer*/) |
| if err != nil { |
| return err |
| } |
| klog.V(2).Infof("Starting the GRPC server for the docker CRI shim.") |
| server := dockerremote.NewDockerServer(f.RemoteRuntimeEndpoint, ds) |
| if err := server.Start(); err != nil { |
| return err |
| } |
| <-stopCh |
| return nil |
| } |