| /* |
| Copyright 2014 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| // Package app implements a Server object for running the scheduler. |
| package app |
| |
| import ( |
| "context" |
| "fmt" |
| "io" |
| "io/ioutil" |
| "net/http" |
| "os" |
| goruntime "runtime" |
| |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/runtime" |
| utilerrors "k8s.io/apimachinery/pkg/util/errors" |
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| "k8s.io/apiserver/pkg/authentication/authenticator" |
| "k8s.io/apiserver/pkg/authorization/authorizer" |
| genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" |
| apirequest "k8s.io/apiserver/pkg/endpoints/request" |
| genericfilters "k8s.io/apiserver/pkg/server/filters" |
| "k8s.io/apiserver/pkg/server/healthz" |
| "k8s.io/apiserver/pkg/server/mux" |
| "k8s.io/apiserver/pkg/server/routes" |
| utilfeature "k8s.io/apiserver/pkg/util/feature" |
| apiserverflag "k8s.io/apiserver/pkg/util/flag" |
| "k8s.io/apiserver/pkg/util/globalflag" |
| storageinformers "k8s.io/client-go/informers/storage/v1" |
| v1core "k8s.io/client-go/kubernetes/typed/core/v1" |
| "k8s.io/client-go/tools/leaderelection" |
| schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config" |
| "k8s.io/kubernetes/cmd/kube-scheduler/app/options" |
| "k8s.io/kubernetes/pkg/api/legacyscheme" |
| "k8s.io/kubernetes/pkg/controller" |
| "k8s.io/kubernetes/pkg/features" |
| "k8s.io/kubernetes/pkg/scheduler" |
| "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" |
| schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" |
| latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest" |
| kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" |
| "k8s.io/kubernetes/pkg/scheduler/factory" |
| "k8s.io/kubernetes/pkg/scheduler/metrics" |
| "k8s.io/kubernetes/pkg/util/configz" |
| utilflag "k8s.io/kubernetes/pkg/util/flag" |
| "k8s.io/kubernetes/pkg/version" |
| "k8s.io/kubernetes/pkg/version/verflag" |
| |
| "github.com/prometheus/client_golang/prometheus" |
| "github.com/spf13/cobra" |
| "k8s.io/klog" |
| ) |
| |
| // NewSchedulerCommand creates a *cobra.Command object with default parameters |
| func NewSchedulerCommand() *cobra.Command { |
| opts, err := options.NewOptions() |
| if err != nil { |
| klog.Fatalf("unable to initialize command options: %v", err) |
| } |
| |
| cmd := &cobra.Command{ |
| Use: "kube-scheduler", |
| Long: `The Kubernetes scheduler is a policy-rich, topology-aware, |
| workload-specific function that significantly impacts availability, performance, |
| and capacity. The scheduler needs to take into account individual and collective |
| resource requirements, quality of service requirements, hardware/software/policy |
| constraints, affinity and anti-affinity specifications, data locality, inter-workload |
| interference, deadlines, and so on. Workload-specific requirements will be exposed |
| through the API as necessary.`, |
| Run: func(cmd *cobra.Command, args []string) { |
| if err := runCommand(cmd, args, opts); err != nil { |
| fmt.Fprintf(os.Stderr, "%v\n", err) |
| os.Exit(1) |
| } |
| }, |
| } |
| fs := cmd.Flags() |
| namedFlagSets := opts.Flags() |
| verflag.AddFlags(namedFlagSets.FlagSet("global")) |
| globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name()) |
| for _, f := range namedFlagSets.FlagSets { |
| fs.AddFlagSet(f) |
| } |
| |
| usageFmt := "Usage:\n %s\n" |
| cols, _, _ := apiserverflag.TerminalSize(cmd.OutOrStdout()) |
| cmd.SetUsageFunc(func(cmd *cobra.Command) error { |
| fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine()) |
| apiserverflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols) |
| return nil |
| }) |
| cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) { |
| fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine()) |
| apiserverflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols) |
| }) |
| cmd.MarkFlagFilename("config", "yaml", "yml", "json") |
| |
| return cmd |
| } |
| |
| // runCommand runs the scheduler. |
| func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error { |
| verflag.PrintAndExitIfRequested() |
| utilflag.PrintFlags(cmd.Flags()) |
| |
| if len(args) != 0 { |
| fmt.Fprint(os.Stderr, "arguments are not supported\n") |
| } |
| |
| if errs := opts.Validate(); len(errs) > 0 { |
| fmt.Fprintf(os.Stderr, "%v\n", utilerrors.NewAggregate(errs)) |
| os.Exit(1) |
| } |
| |
| if len(opts.WriteConfigTo) > 0 { |
| if err := options.WriteConfigFile(opts.WriteConfigTo, &opts.ComponentConfig); err != nil { |
| fmt.Fprintf(os.Stderr, "%v\n", err) |
| os.Exit(1) |
| } |
| klog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo) |
| } |
| |
| c, err := opts.Config() |
| if err != nil { |
| fmt.Fprintf(os.Stderr, "%v\n", err) |
| os.Exit(1) |
| } |
| |
| stopCh := make(chan struct{}) |
| |
| // Get the completed config |
| cc := c.Complete() |
| |
| // To help debugging, immediately log version |
| klog.Infof("Version: %+v", version.Get()) |
| |
| // Apply algorithms based on feature gates. |
| // TODO: make configurable? |
| algorithmprovider.ApplyFeatureGates() |
| |
| // Configz registration. |
| if cz, err := configz.New("componentconfig"); err == nil { |
| cz.Set(cc.ComponentConfig) |
| } else { |
| return fmt.Errorf("unable to register configz: %s", err) |
| } |
| |
| return Run(cc, stopCh) |
| } |
| |
| // Run executes the scheduler based on the given configuration. It only return on error or when stopCh is closed. |
| func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error { |
| var storageClassInformer storageinformers.StorageClassInformer |
| if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { |
| storageClassInformer = cc.InformerFactory.Storage().V1().StorageClasses() |
| } |
| |
| // Create the scheduler. |
| sched, err := scheduler.New(cc.Client, |
| cc.InformerFactory.Core().V1().Nodes(), |
| cc.PodInformer, |
| cc.InformerFactory.Core().V1().PersistentVolumes(), |
| cc.InformerFactory.Core().V1().PersistentVolumeClaims(), |
| cc.InformerFactory.Core().V1().ReplicationControllers(), |
| cc.InformerFactory.Apps().V1().ReplicaSets(), |
| cc.InformerFactory.Apps().V1().StatefulSets(), |
| cc.InformerFactory.Core().V1().Services(), |
| cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(), |
| storageClassInformer, |
| cc.Recorder, |
| cc.ComponentConfig.AlgorithmSource, |
| stopCh, |
| scheduler.WithName(cc.ComponentConfig.SchedulerName), |
| scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight), |
| scheduler.WithEquivalenceClassCacheEnabled(cc.ComponentConfig.EnableContentionProfiling), |
| scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption), |
| scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), |
| scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds)) |
| if err != nil { |
| return err |
| } |
| |
| // Prepare the event broadcaster. |
| if cc.Broadcaster != nil && cc.EventClient != nil { |
| cc.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.EventClient.Events("")}) |
| } |
| |
| // Setup healthz checks. |
| var checks []healthz.HealthzChecker |
| if cc.ComponentConfig.LeaderElection.LeaderElect { |
| checks = append(checks, cc.LeaderElection.WatchDog) |
| } |
| |
| // Start up the healthz server. |
| if cc.InsecureServing != nil { |
| separateMetrics := cc.InsecureMetricsServing != nil |
| handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil) |
| if err := cc.InsecureServing.Serve(handler, 0, stopCh); err != nil { |
| return fmt.Errorf("failed to start healthz server: %v", err) |
| } |
| } |
| if cc.InsecureMetricsServing != nil { |
| handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil) |
| if err := cc.InsecureMetricsServing.Serve(handler, 0, stopCh); err != nil { |
| return fmt.Errorf("failed to start metrics server: %v", err) |
| } |
| } |
| if cc.SecureServing != nil { |
| handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer) |
| if err := cc.SecureServing.Serve(handler, 0, stopCh); err != nil { |
| // fail early for secure handlers, removing the old error loop from above |
| return fmt.Errorf("failed to start healthz server: %v", err) |
| } |
| } |
| |
| // Start all informers. |
| go cc.PodInformer.Informer().Run(stopCh) |
| cc.InformerFactory.Start(stopCh) |
| |
| // Wait for all caches to sync before scheduling. |
| cc.InformerFactory.WaitForCacheSync(stopCh) |
| controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced) |
| |
| // Prepare a reusable runCommand function. |
| run := func(ctx context.Context) { |
| sched.Run() |
| <-ctx.Done() |
| } |
| |
| ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here |
| defer cancel() |
| |
| go func() { |
| select { |
| case <-stopCh: |
| cancel() |
| case <-ctx.Done(): |
| } |
| }() |
| |
| // If leader election is enabled, runCommand via LeaderElector until done and exit. |
| if cc.LeaderElection != nil { |
| cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ |
| OnStartedLeading: run, |
| OnStoppedLeading: func() { |
| utilruntime.HandleError(fmt.Errorf("lost master")) |
| }, |
| } |
| leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection) |
| if err != nil { |
| return fmt.Errorf("couldn't create leader elector: %v", err) |
| } |
| |
| leaderElector.Run(ctx) |
| |
| return fmt.Errorf("lost lease") |
| } |
| |
| // Leader election is disabled, so runCommand inline until done. |
| run(ctx) |
| return fmt.Errorf("finished without leader elect") |
| } |
| |
| // buildHandlerChain wraps the given handler with the standard filters. |
| func buildHandlerChain(handler http.Handler, authn authenticator.Request, authz authorizer.Authorizer) http.Handler { |
| requestInfoResolver := &apirequest.RequestInfoFactory{} |
| failedHandler := genericapifilters.Unauthorized(legacyscheme.Codecs, false) |
| |
| handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver) |
| handler = genericapifilters.WithAuthorization(handler, authz, legacyscheme.Codecs) |
| handler = genericapifilters.WithAuthentication(handler, authn, failedHandler, nil) |
| handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver) |
| handler = genericfilters.WithPanicRecovery(handler) |
| |
| return handler |
| } |
| |
| func installMetricHandler(pathRecorderMux *mux.PathRecorderMux) { |
| configz.InstallHandler(pathRecorderMux) |
| defaultMetricsHandler := prometheus.Handler().ServeHTTP |
| pathRecorderMux.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) { |
| if req.Method == "DELETE" { |
| metrics.Reset() |
| io.WriteString(w, "metrics reset\n") |
| return |
| } |
| defaultMetricsHandler(w, req) |
| }) |
| } |
| |
| // newMetricsHandler builds a metrics server from the config. |
| func newMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration) http.Handler { |
| pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler") |
| installMetricHandler(pathRecorderMux) |
| if config.EnableProfiling { |
| routes.Profiling{}.Install(pathRecorderMux) |
| if config.EnableContentionProfiling { |
| goruntime.SetBlockProfileRate(1) |
| } |
| } |
| return pathRecorderMux |
| } |
| |
| // newHealthzServer creates a healthz server from the config, and will also |
| // embed the metrics handler if the healthz and metrics address configurations |
| // are the same. |
| func newHealthzHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, separateMetrics bool, checks ...healthz.HealthzChecker) http.Handler { |
| pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler") |
| healthz.InstallHandler(pathRecorderMux, checks...) |
| if !separateMetrics { |
| installMetricHandler(pathRecorderMux) |
| } |
| if config.EnableProfiling { |
| routes.Profiling{}.Install(pathRecorderMux) |
| if config.EnableContentionProfiling { |
| goruntime.SetBlockProfileRate(1) |
| } |
| } |
| return pathRecorderMux |
| } |
| |
| // NewSchedulerConfig creates the scheduler configuration. This is exposed for use by tests. |
| func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*factory.Config, error) { |
| var storageClassInformer storageinformers.StorageClassInformer |
| if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { |
| storageClassInformer = s.InformerFactory.Storage().V1().StorageClasses() |
| } |
| |
| // Set up the configurator which can create schedulers from configs. |
| configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{ |
| SchedulerName: s.ComponentConfig.SchedulerName, |
| Client: s.Client, |
| NodeInformer: s.InformerFactory.Core().V1().Nodes(), |
| PodInformer: s.PodInformer, |
| PvInformer: s.InformerFactory.Core().V1().PersistentVolumes(), |
| PvcInformer: s.InformerFactory.Core().V1().PersistentVolumeClaims(), |
| ReplicationControllerInformer: s.InformerFactory.Core().V1().ReplicationControllers(), |
| ReplicaSetInformer: s.InformerFactory.Apps().V1().ReplicaSets(), |
| StatefulSetInformer: s.InformerFactory.Apps().V1().StatefulSets(), |
| ServiceInformer: s.InformerFactory.Core().V1().Services(), |
| PdbInformer: s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(), |
| StorageClassInformer: storageClassInformer, |
| HardPodAffinitySymmetricWeight: s.ComponentConfig.HardPodAffinitySymmetricWeight, |
| EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache), |
| DisablePreemption: s.ComponentConfig.DisablePreemption, |
| PercentageOfNodesToScore: s.ComponentConfig.PercentageOfNodesToScore, |
| BindTimeoutSeconds: *s.ComponentConfig.BindTimeoutSeconds, |
| }) |
| |
| source := s.ComponentConfig.AlgorithmSource |
| var config *factory.Config |
| switch { |
| case source.Provider != nil: |
| // Create the config from a named algorithm provider. |
| sc, err := configurator.CreateFromProvider(*source.Provider) |
| if err != nil { |
| return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err) |
| } |
| config = sc |
| case source.Policy != nil: |
| // Create the config from a user specified policy source. |
| policy := &schedulerapi.Policy{} |
| switch { |
| case source.Policy.File != nil: |
| // Use a policy serialized in a file. |
| policyFile := source.Policy.File.Path |
| _, err := os.Stat(policyFile) |
| if err != nil { |
| return nil, fmt.Errorf("missing policy config file %s", policyFile) |
| } |
| data, err := ioutil.ReadFile(policyFile) |
| if err != nil { |
| return nil, fmt.Errorf("couldn't read policy config: %v", err) |
| } |
| err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy) |
| if err != nil { |
| return nil, fmt.Errorf("invalid policy: %v", err) |
| } |
| case source.Policy.ConfigMap != nil: |
| // Use a policy serialized in a config map value. |
| policyRef := source.Policy.ConfigMap |
| policyConfigMap, err := s.Client.CoreV1().ConfigMaps(policyRef.Namespace).Get(policyRef.Name, metav1.GetOptions{}) |
| if err != nil { |
| return nil, fmt.Errorf("couldn't get policy config map %s/%s: %v", policyRef.Namespace, policyRef.Name, err) |
| } |
| data, found := policyConfigMap.Data[kubeschedulerconfig.SchedulerPolicyConfigMapKey] |
| if !found { |
| return nil, fmt.Errorf("missing policy config map value at key %q", kubeschedulerconfig.SchedulerPolicyConfigMapKey) |
| } |
| err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy) |
| if err != nil { |
| return nil, fmt.Errorf("invalid policy: %v", err) |
| } |
| } |
| sc, err := configurator.CreateFromConfig(*policy) |
| if err != nil { |
| return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err) |
| } |
| config = sc |
| default: |
| return nil, fmt.Errorf("unsupported algorithm source: %v", source) |
| } |
| // Additional tweaks to the config produced by the configurator. |
| config.Recorder = s.Recorder |
| |
| config.DisablePreemption = s.ComponentConfig.DisablePreemption |
| return config, nil |
| } |