| /* |
| Copyright 2016 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package apiserver |
| |
| import ( |
| "net/http" |
| "time" |
| |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/runtime/schema" |
| "k8s.io/apimachinery/pkg/util/sets" |
| genericapiserver "k8s.io/apiserver/pkg/server" |
| serverstorage "k8s.io/apiserver/pkg/server/storage" |
| "k8s.io/client-go/pkg/version" |
| |
| "k8s.io/kube-aggregator/pkg/apis/apiregistration" |
| "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" |
| "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1" |
| aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" |
| "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset" |
| informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion" |
| listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion" |
| openapicontroller "k8s.io/kube-aggregator/pkg/controllers/openapi" |
| statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status" |
| apiservicerest "k8s.io/kube-aggregator/pkg/registry/apiservice/rest" |
| ) |
| |
| func init() { |
| // we need to add the options (like ListOptions) to empty v1 |
| metav1.AddToGroupVersion(aggregatorscheme.Scheme, schema.GroupVersion{Group: "", Version: "v1"}) |
| |
| unversioned := schema.GroupVersion{Group: "", Version: "v1"} |
| aggregatorscheme.Scheme.AddUnversionedTypes(unversioned, |
| &metav1.Status{}, |
| &metav1.APIVersions{}, |
| &metav1.APIGroupList{}, |
| &metav1.APIGroup{}, |
| &metav1.APIResourceList{}, |
| ) |
| } |
| |
| // legacyAPIServiceName is the fixed name of the only non-groupified API version |
| const legacyAPIServiceName = "v1." |
| |
| type ExtraConfig struct { |
| // ProxyClientCert/Key are the client cert used to identify this proxy. Backing APIServices use |
| // this to confirm the proxy's identity |
| ProxyClientCert []byte |
| ProxyClientKey []byte |
| |
| // If present, the Dial method will be used for dialing out to delegate |
| // apiservers. |
| ProxyTransport *http.Transport |
| |
| // Mechanism by which the Aggregator will resolve services. Required. |
| ServiceResolver ServiceResolver |
| } |
| |
| type Config struct { |
| GenericConfig *genericapiserver.RecommendedConfig |
| ExtraConfig ExtraConfig |
| } |
| |
| type completedConfig struct { |
| GenericConfig genericapiserver.CompletedConfig |
| ExtraConfig *ExtraConfig |
| } |
| |
| type CompletedConfig struct { |
| // Embed a private pointer that cannot be instantiated outside of this package. |
| *completedConfig |
| } |
| |
| // APIAggregator contains state for a Kubernetes cluster master/api server. |
| type APIAggregator struct { |
| GenericAPIServer *genericapiserver.GenericAPIServer |
| |
| delegateHandler http.Handler |
| |
| // proxyClientCert/Key are the client cert used to identify this proxy. Backing APIServices use |
| // this to confirm the proxy's identity |
| proxyClientCert []byte |
| proxyClientKey []byte |
| proxyTransport *http.Transport |
| |
| // proxyHandlers are the proxy handlers that are currently registered, keyed by apiservice.name |
| proxyHandlers map[string]*proxyHandler |
| // handledGroups are the groups that already have routes |
| handledGroups sets.String |
| |
| // lister is used to add group handling for /apis/<group> aggregator lookups based on |
| // controller state |
| lister listers.APIServiceLister |
| |
| // provided for easier embedding |
| APIRegistrationInformers informers.SharedInformerFactory |
| |
| // Information needed to determine routing for the aggregator |
| serviceResolver ServiceResolver |
| |
| openAPIAggregationController *openapicontroller.AggregationController |
| } |
| |
| // Complete fills in any fields not set that are required to have valid data. It's mutating the receiver. |
| func (cfg *Config) Complete() CompletedConfig { |
| c := completedConfig{ |
| cfg.GenericConfig.Complete(), |
| &cfg.ExtraConfig, |
| } |
| |
| // the kube aggregator wires its own discovery mechanism |
| // TODO eventually collapse this by extracting all of the discovery out |
| c.GenericConfig.EnableDiscovery = false |
| version := version.Get() |
| c.GenericConfig.Version = &version |
| |
| return CompletedConfig{&c} |
| } |
| |
| // New returns a new instance of APIAggregator from the given config. |
| func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) { |
| // Prevent generic API server to install OpenAPI handler. Aggregator server |
| // has its own customized OpenAPI handler. |
| openApiConfig := c.GenericConfig.OpenAPIConfig |
| c.GenericConfig.OpenAPIConfig = nil |
| |
| genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget) |
| if err != nil { |
| return nil, err |
| } |
| |
| apiregistrationClient, err := internalclientset.NewForConfig(c.GenericConfig.LoopbackClientConfig) |
| if err != nil { |
| return nil, err |
| } |
| informerFactory := informers.NewSharedInformerFactory( |
| apiregistrationClient, |
| 5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on. |
| ) |
| |
| s := &APIAggregator{ |
| GenericAPIServer: genericServer, |
| delegateHandler: delegationTarget.UnprotectedHandler(), |
| proxyClientCert: c.ExtraConfig.ProxyClientCert, |
| proxyClientKey: c.ExtraConfig.ProxyClientKey, |
| proxyTransport: c.ExtraConfig.ProxyTransport, |
| proxyHandlers: map[string]*proxyHandler{}, |
| handledGroups: sets.String{}, |
| lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(), |
| APIRegistrationInformers: informerFactory, |
| serviceResolver: c.ExtraConfig.ServiceResolver, |
| } |
| |
| apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter) |
| if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil { |
| return nil, err |
| } |
| |
| apisHandler := &apisHandler{ |
| codecs: aggregatorscheme.Codecs, |
| lister: s.lister, |
| } |
| s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler) |
| s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler) |
| |
| apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), s) |
| availableController := statuscontrollers.NewAvailableConditionController( |
| informerFactory.Apiregistration().InternalVersion().APIServices(), |
| c.GenericConfig.SharedInformerFactory.Core().V1().Services(), |
| c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(), |
| apiregistrationClient.Apiregistration(), |
| c.ExtraConfig.ProxyTransport, |
| s.serviceResolver, |
| ) |
| |
| s.GenericAPIServer.AddPostStartHook("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error { |
| informerFactory.Start(context.StopCh) |
| c.GenericConfig.SharedInformerFactory.Start(context.StopCh) |
| return nil |
| }) |
| s.GenericAPIServer.AddPostStartHook("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error { |
| go apiserviceRegistrationController.Run(context.StopCh) |
| return nil |
| }) |
| s.GenericAPIServer.AddPostStartHook("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error { |
| // if we end up blocking for long periods of time, we may need to increase threadiness. |
| go availableController.Run(5, context.StopCh) |
| return nil |
| }) |
| |
| if openApiConfig != nil { |
| specDownloader := openapicontroller.NewDownloader() |
| openAPIAggregator, err := openapicontroller.BuildAndRegisterAggregator( |
| &specDownloader, |
| delegationTarget, |
| s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(), |
| openApiConfig, |
| s.GenericAPIServer.Handler.NonGoRestfulMux) |
| if err != nil { |
| return nil, err |
| } |
| s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator) |
| |
| s.GenericAPIServer.AddPostStartHook("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error { |
| go s.openAPIAggregationController.Run(context.StopCh) |
| return nil |
| }) |
| } |
| |
| return s, nil |
| } |
| |
| // AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please. |
| // It's a slow moving API, so its ok to run the controller on a single thread |
| func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) error { |
| // if the proxyHandler already exists, it needs to be updated. The aggregation bits do not |
| // since they are wired against listers because they require multiple resources to respond |
| if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists { |
| proxyHandler.updateAPIService(apiService) |
| if s.openAPIAggregationController != nil { |
| s.openAPIAggregationController.UpdateAPIService(proxyHandler, apiService) |
| } |
| return nil |
| } |
| |
| proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version |
| // v1. is a special case for the legacy API. It proxies to a wider set of endpoints. |
| if apiService.Name == legacyAPIServiceName { |
| proxyPath = "/api" |
| } |
| |
| // register the proxy handler |
| proxyHandler := &proxyHandler{ |
| localDelegate: s.delegateHandler, |
| proxyClientCert: s.proxyClientCert, |
| proxyClientKey: s.proxyClientKey, |
| proxyTransport: s.proxyTransport, |
| serviceResolver: s.serviceResolver, |
| } |
| proxyHandler.updateAPIService(apiService) |
| if s.openAPIAggregationController != nil { |
| s.openAPIAggregationController.AddAPIService(proxyHandler, apiService) |
| } |
| s.proxyHandlers[apiService.Name] = proxyHandler |
| s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler) |
| s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler) |
| |
| // if we're dealing with the legacy group, we're done here |
| if apiService.Name == legacyAPIServiceName { |
| return nil |
| } |
| |
| // if we've already registered the path with the handler, we don't want to do it again. |
| if s.handledGroups.Has(apiService.Spec.Group) { |
| return nil |
| } |
| |
| // it's time to register the group aggregation endpoint |
| groupPath := "/apis/" + apiService.Spec.Group |
| groupDiscoveryHandler := &apiGroupHandler{ |
| codecs: aggregatorscheme.Codecs, |
| groupName: apiService.Spec.Group, |
| lister: s.lister, |
| delegate: s.delegateHandler, |
| } |
| // aggregation is protected |
| s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(groupPath, groupDiscoveryHandler) |
| s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle(groupPath+"/", groupDiscoveryHandler) |
| s.handledGroups.Insert(apiService.Spec.Group) |
| return nil |
| } |
| |
| // RemoveAPIService removes the APIService from being handled. It is not thread-safe, so only call it on one thread at a time please. |
| // It's a slow moving API, so its ok to run the controller on a single thread. |
| func (s *APIAggregator) RemoveAPIService(apiServiceName string) { |
| version := apiregistration.APIServiceNameToGroupVersion(apiServiceName) |
| |
| proxyPath := "/apis/" + version.Group + "/" + version.Version |
| // v1. is a special case for the legacy API. It proxies to a wider set of endpoints. |
| if apiServiceName == legacyAPIServiceName { |
| proxyPath = "/api" |
| } |
| s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(proxyPath) |
| s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(proxyPath + "/") |
| if s.openAPIAggregationController != nil { |
| s.openAPIAggregationController.RemoveAPIService(apiServiceName) |
| } |
| delete(s.proxyHandlers, apiServiceName) |
| |
| // TODO unregister group level discovery when there are no more versions for the group |
| // We don't need this right away because the handler properly delegates when no versions are present |
| } |
| |
| func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig { |
| ret := serverstorage.NewResourceConfig() |
| // NOTE: GroupVersions listed here will be enabled by default. Don't put alpha versions in the list. |
| ret.EnableVersions( |
| v1.SchemeGroupVersion, |
| v1beta1.SchemeGroupVersion, |
| ) |
| |
| return ret |
| } |