| // Copyright Istio Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package bootstrap |
| |
| import ( |
| "context" |
| "crypto/tls" |
| "crypto/x509" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "net" |
| "net/http" |
| "os" |
| "strings" |
| "sync" |
| "time" |
| ) |
| |
| import ( |
| "github.com/fsnotify/fsnotify" |
| prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" |
| prom "github.com/prometheus/client_golang/prometheus" |
| "golang.org/x/net/http2" |
| "golang.org/x/net/http2/h2c" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/credentials" |
| "google.golang.org/grpc/reflection" |
| "istio.io/api/security/v1beta1" |
| "istio.io/pkg/ctrlz" |
| "istio.io/pkg/filewatcher" |
| "istio.io/pkg/log" |
| "istio.io/pkg/version" |
| v1 "k8s.io/client-go/kubernetes/typed/core/v1" |
| "k8s.io/client-go/rest" |
| "k8s.io/client-go/tools/cache" |
| ) |
| |
| import ( |
| kubecredentials "github.com/apache/dubbo-go-pixiu/pilot/pkg/credentials/kube" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/features" |
| istiogrpc "github.com/apache/dubbo-go-pixiu/pilot/pkg/grpc" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/keycertbundle" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/model" |
| dubbov1alpha1 "github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/dubbo/v1alpha1" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/server" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/aggregate" |
| kubecontroller "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube/controller" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/provider" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/serviceentry" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/status" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/status/distribution" |
| tb "github.com/apache/dubbo-go-pixiu/pilot/pkg/trustbundle" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds" |
| v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3" |
| "github.com/apache/dubbo-go-pixiu/pkg/cluster" |
| "github.com/apache/dubbo-go-pixiu/pkg/config" |
| "github.com/apache/dubbo-go-pixiu/pkg/config/constants" |
| "github.com/apache/dubbo-go-pixiu/pkg/config/mesh" |
| "github.com/apache/dubbo-go-pixiu/pkg/config/schema/collections" |
| "github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk" |
| istiokeepalive "github.com/apache/dubbo-go-pixiu/pkg/keepalive" |
| kubelib "github.com/apache/dubbo-go-pixiu/pkg/kube" |
| "github.com/apache/dubbo-go-pixiu/pkg/kube/inject" |
| "github.com/apache/dubbo-go-pixiu/pkg/kube/multicluster" |
| "github.com/apache/dubbo-go-pixiu/pkg/security" |
| "github.com/apache/dubbo-go-pixiu/pkg/spiffe" |
| "github.com/apache/dubbo-go-pixiu/security/pkg/k8s/chiron" |
| "github.com/apache/dubbo-go-pixiu/security/pkg/pki/ca" |
| "github.com/apache/dubbo-go-pixiu/security/pkg/pki/ra" |
| "github.com/apache/dubbo-go-pixiu/security/pkg/server/ca/authenticate" |
| "github.com/apache/dubbo-go-pixiu/security/pkg/server/ca/authenticate/kubeauth" |
| ) |
| |
| const ( |
| // debounce file watcher events to minimize noise in logs |
| watchDebounceDelay = 100 * time.Millisecond |
| ) |
| |
| func init() { |
| // Disable gRPC tracing. It has performance impacts (See https://github.com/grpc/grpc-go/issues/695) |
| grpc.EnableTracing = false |
| |
| // Export pilot version as metric for fleet analytics. |
| pilotVersion := prom.NewGaugeVec(prom.GaugeOpts{ |
| Name: "pilot_info", |
| Help: "Pilot version and build information.", |
| }, []string{"version"}) |
| prom.MustRegister(pilotVersion) |
| pilotVersion.With(prom.Labels{"version": version.Info.String()}).Set(1) |
| } |
| |
| // readinessProbe defines a function that will be used indicate whether a server is ready. |
| type readinessProbe func() (bool, error) |
| |
| // Server contains the runtime configuration for the Pilot discovery service. |
| type Server struct { |
| XDSServer *xds.DiscoveryServer |
| |
| clusterID cluster.ID |
| environment *model.Environment |
| |
| kubeClient kubelib.Client |
| |
| multiclusterController *multicluster.Controller |
| |
| configController model.ConfigStoreController |
| ConfigStores []model.ConfigStoreController |
| serviceEntryController *serviceentry.Controller |
| |
| httpServer *http.Server // debug, monitoring and readiness Server. |
| httpsServer *http.Server // webhooks HTTPS Server. |
| httpsReadyClient *http.Client |
| |
| grpcServer *grpc.Server |
| grpcAddress string |
| secureGrpcServer *grpc.Server |
| secureGrpcAddress string |
| |
| // monitoringMux listens on monitoringAddr(:15014). |
| // Currently runs prometheus monitoring and debug (if enabled). |
| monitoringMux *http.ServeMux |
| |
| // httpMux listens on the httpAddr (8080). |
| // If a Gateway is used in front and https is off it is also multiplexing |
| // the rest of the features if their port is empty. |
| // Currently runs readiness and debug (if enabled) |
| httpMux *http.ServeMux |
| |
| // httpsMux listens on the httpsAddr(15017), handling webhooks |
| // If the address os empty, the webhooks will be set on the default httpPort. |
| httpsMux *http.ServeMux // webhooks |
| |
| // MultiplexGRPC will serve gRPC and HTTP (1 or 2) over the HTTPListener, if enabled. |
| MultiplexGRPC bool |
| |
| // fileWatcher used to watch mesh config, networks and certificates. |
| fileWatcher filewatcher.FileWatcher |
| |
| // certWatcher watches the certificates for changes and triggers a notification to Istiod. |
| cacertsWatcher *fsnotify.Watcher |
| dnsNames []string |
| |
| certController *chiron.WebhookController |
| CA *ca.IstioCA |
| RA ra.RegistrationAuthority |
| |
| // TrustAnchors for workload to workload mTLS |
| workloadTrustBundle *tb.TrustBundle |
| certMu sync.RWMutex |
| istiodCert *tls.Certificate |
| istiodCertBundleWatcher *keycertbundle.Watcher |
| server server.Instance |
| |
| readinessProbes map[string]readinessProbe |
| |
| // duration used for graceful shutdown. |
| shutdownDuration time.Duration |
| |
| // internalStop is closed when the server is shutdown. This should be avoided as much as possible, in |
| // favor of AddStartFunc. This is only required if we *must* start something outside of this process. |
| // For example, everything depends on mesh config, so we use it there rather than trying to sequence everything |
| // in AddStartFunc |
| internalStop chan struct{} |
| |
| statusReporter *distribution.Reporter |
| statusManager *status.Manager |
| // RWConfigStore is the configstore which allows updates, particularly for status. |
| RWConfigStore model.ConfigStoreController |
| |
| // ServiceMetadataServer |
| metadataServer *dubbov1alpha1.ServiceMetadataServer |
| |
| //Service Name mapping register |
| snpServer *dubbov1alpha1.Snp |
| } |
| |
| // NewServer creates a new Server instance based on the provided arguments. |
| func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) { |
| e := &model.Environment{ |
| PushContext: model.NewPushContext(), |
| DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix, |
| } |
| e.SetLedger(buildLedger(args.RegistryOptions)) |
| |
| ac := aggregate.NewController(aggregate.Options{ |
| MeshHolder: e, |
| }) |
| e.ServiceDiscovery = ac |
| |
| s := &Server{ |
| clusterID: getClusterID(args), |
| environment: e, |
| fileWatcher: filewatcher.NewWatcher(), |
| httpMux: http.NewServeMux(), |
| monitoringMux: http.NewServeMux(), |
| readinessProbes: make(map[string]readinessProbe), |
| workloadTrustBundle: tb.NewTrustBundle(nil), |
| server: server.New(), |
| shutdownDuration: args.ShutdownDuration, |
| internalStop: make(chan struct{}), |
| istiodCertBundleWatcher: keycertbundle.NewWatcher(), |
| } |
| // Apply custom initialization functions. |
| for _, fn := range initFuncs { |
| fn(s) |
| } |
| // Initialize workload Trust Bundle before XDS Server |
| e.TrustBundle = s.workloadTrustBundle |
| s.XDSServer = xds.NewDiscoveryServer(e, args.PodName, args.RegistryOptions.KubeOptions.ClusterAliases) |
| |
| prometheus.EnableHandlingTimeHistogram() |
| |
| // Apply the arguments to the configuration. |
| if err := s.initKubeClient(args); err != nil { |
| return nil, fmt.Errorf("error initializing kube client: %v", err) |
| } |
| |
| // used for both initKubeRegistry and initClusterRegistries |
| args.RegistryOptions.KubeOptions.EndpointMode = kubecontroller.DetectEndpointMode(s.kubeClient) |
| |
| s.initMeshConfiguration(args, s.fileWatcher) |
| spiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain()) |
| |
| s.initMeshNetworks(args, s.fileWatcher) |
| s.initMeshHandlers() |
| s.environment.Init() |
| if err := s.environment.InitNetworksManager(s.XDSServer); err != nil { |
| return nil, err |
| } |
| |
| // Options based on the current 'defaults' in istio. |
| caOpts := &caOptions{ |
| TrustDomain: s.environment.Mesh().TrustDomain, |
| Namespace: args.Namespace, |
| ExternalCAType: ra.CaExternalType(externalCaType), |
| CertSignerDomain: features.CertSignerDomain, |
| } |
| |
| if caOpts.ExternalCAType == ra.ExtCAK8s { |
| // Older environment variable preserved for backward compatibility |
| caOpts.ExternalCASigner = k8sSigner |
| } |
| // CA signing certificate must be created first if needed. |
| if err := s.maybeCreateCA(caOpts); err != nil { |
| return nil, err |
| } |
| |
| if err := s.initControllers(args); err != nil { |
| return nil, err |
| } |
| |
| s.XDSServer.InitGenerators(e, args.Namespace) |
| |
| // Initialize workloadTrustBundle after CA has been initialized |
| if err := s.initWorkloadTrustBundle(args); err != nil { |
| return nil, err |
| } |
| |
| // Parse and validate Istiod Address. |
| istiodHost, _, err := e.GetDiscoveryAddress() |
| if err != nil { |
| return nil, err |
| } |
| |
| // Create Istiod certs and setup watches. |
| if err := s.initIstiodCerts(args, string(istiodHost)); err != nil { |
| return nil, err |
| } |
| |
| if s.kubeClient != nil { |
| s.metadataServer = dubbov1alpha1.NewServiceMetadataServer(s.kubeClient) |
| } |
| |
| // Create Service Name mapping server |
| if s.kubeClient != nil { |
| s.snpServer = dubbov1alpha1.NewSnp(s.kubeClient) |
| } |
| |
| // Secure gRPC Server must be initialized after CA is created as may use a Citadel generated cert. |
| if err := s.initSecureDiscoveryService(args); err != nil { |
| return nil, fmt.Errorf("error initializing secure gRPC Listener: %v", err) |
| } |
| |
| var wh *inject.Webhook |
| // common https server for webhooks (e.g. injection, validation) |
| if s.kubeClient != nil { |
| s.initSecureWebhookServer(args) |
| wh, err = s.initSidecarInjector(args) |
| if err != nil { |
| return nil, fmt.Errorf("error initializing sidecar injector: %v", err) |
| } |
| if err := s.initConfigValidation(args); err != nil { |
| return nil, fmt.Errorf("error initializing config validator: %v", err) |
| } |
| } |
| |
| whc := func() map[string]string { |
| if wh != nil { |
| return wh.Config.RawTemplates |
| } |
| return map[string]string{} |
| } |
| |
| // Used for readiness, monitoring and debug handlers. |
| if err := s.initIstiodAdminServer(args, whc); err != nil { |
| return nil, fmt.Errorf("error initializing debug server: %v", err) |
| } |
| // This should be called only after controllers are initialized. |
| s.initRegistryEventHandlers() |
| |
| s.initDiscoveryService(args) |
| |
| s.initSDSServer() |
| |
| // Notice that the order of authenticators matters, since at runtime |
| // authenticators are activated sequentially and the first successful attempt |
| // is used as the authentication result. |
| authenticators := []security.Authenticator{ |
| &authenticate.ClientCertAuthenticator{}, |
| } |
| if args.JwtRule != "" { |
| jwtAuthn, err := initOIDC(args, s.environment.Mesh().TrustDomain) |
| if err != nil { |
| return nil, fmt.Errorf("error initializing OIDC: %v", err) |
| } |
| if jwtAuthn == nil { |
| return nil, fmt.Errorf("JWT authenticator is nil") |
| } |
| authenticators = append(authenticators, jwtAuthn) |
| } |
| // The k8s JWT authenticator requires the multicluster registry to be initialized, |
| // so we build it later. |
| authenticators = append(authenticators, |
| kubeauth.NewKubeJWTAuthenticator(s.environment.Watcher, s.kubeClient, s.clusterID, s.multiclusterController.GetRemoteKubeClient, features.JwtPolicy)) |
| if features.XDSAuth { |
| s.XDSServer.Authenticators = authenticators |
| } |
| caOpts.Authenticators = authenticators |
| |
| // Start CA or RA server. This should be called after CA and Istiod certs have been created. |
| s.startCA(caOpts) |
| |
| // TODO: don't run this if galley is started, one ctlz is enough |
| if args.CtrlZOptions != nil { |
| _, _ = ctrlz.Run(args.CtrlZOptions, nil) |
| } |
| |
| // This must be last, otherwise we will not know which informers to register |
| if s.kubeClient != nil { |
| s.addStartFunc(func(stop <-chan struct{}) error { |
| s.kubeClient.RunAndWait(stop) |
| return nil |
| }) |
| } |
| |
| s.addReadinessProbe("discovery", func() (bool, error) { |
| return s.XDSServer.IsServerReady(), nil |
| }) |
| |
| return s, nil |
| } |
| |
| func initOIDC(args *PilotArgs, trustDomain string) (security.Authenticator, error) { |
| // JWTRule is from the JWT_RULE environment variable. |
| // An example of json string for JWTRule is: |
| // `{"issuer": "foo", "jwks_uri": "baz", "audiences": ["aud1", "aud2"]}`. |
| jwtRule := &v1beta1.JWTRule{} |
| err := json.Unmarshal([]byte(args.JwtRule), jwtRule) |
| if err != nil { |
| return nil, fmt.Errorf("failed to unmarshal JWT rule: %v", err) |
| } |
| log.Infof("Istiod authenticating using JWTRule: %v", jwtRule) |
| jwtAuthn, err := authenticate.NewJwtAuthenticator(jwtRule, trustDomain) |
| if err != nil { |
| return nil, fmt.Errorf("failed to create the JWT authenticator: %v", err) |
| } |
| return jwtAuthn, nil |
| } |
| |
| func getClusterID(args *PilotArgs) cluster.ID { |
| clusterID := args.RegistryOptions.KubeOptions.ClusterID |
| if clusterID == "" { |
| if hasKubeRegistry(args.RegistryOptions.Registries) { |
| clusterID = cluster.ID(provider.Kubernetes) |
| } |
| } |
| return clusterID |
| } |
| |
| func isUnexpectedListenerError(err error) bool { |
| if err == nil { |
| return false |
| } |
| if errors.Is(err, net.ErrClosed) { |
| return false |
| } |
| if errors.Is(err, http.ErrServerClosed) { |
| return false |
| } |
| return true |
| } |
| |
| // Start starts all components of the Pilot discovery service on the port specified in DiscoveryServerOptions. |
| // If Port == 0, a port number is automatically chosen. Content serving is started by this method, |
| // but is executed asynchronously. Serving can be canceled at any time by closing the provided stop channel. |
| func (s *Server) Start(stop <-chan struct{}) error { |
| log.Infof("Starting Istiod Server with primary cluster %s", s.clusterID) |
| |
| if features.UnsafeFeaturesEnabled() { |
| log.Warn("Server is starting with unsafe features enabled") |
| } |
| |
| // Now start all of the components. |
| if err := s.server.Start(stop); err != nil { |
| return err |
| } |
| if !s.waitForCacheSync(stop) { |
| return fmt.Errorf("failed to sync cache") |
| } |
| // Inform Discovery Server so that it can start accepting connections. |
| s.XDSServer.CachesSynced() |
| |
| // Race condition - if waitForCache is too fast and we run this as a startup function, |
| // the grpc server would be started before CA is registered. Listening should be last. |
| if s.secureGrpcAddress != "" { |
| grpcListener, err := net.Listen("tcp", s.secureGrpcAddress) |
| if err != nil { |
| return err |
| } |
| go func() { |
| log.Infof("starting secure gRPC discovery service at %s", grpcListener.Addr()) |
| if err := s.secureGrpcServer.Serve(grpcListener); err != nil { |
| log.Errorf("error serving secure GRPC server: %v", err) |
| } |
| }() |
| } |
| |
| if s.grpcAddress != "" { |
| grpcListener, err := net.Listen("tcp", s.grpcAddress) |
| if err != nil { |
| return err |
| } |
| go func() { |
| log.Infof("starting gRPC discovery service at %s", grpcListener.Addr()) |
| if err := s.grpcServer.Serve(grpcListener); err != nil { |
| log.Errorf("error serving GRPC server: %v", err) |
| } |
| }() |
| } |
| |
| if s.MultiplexGRPC { |
| log.Infof("multiplexing gRPC services with HTTP services") |
| h2s := &http2.Server{ |
| MaxConcurrentStreams: uint32(features.MaxConcurrentStreams), |
| } |
| // In the past, we have tried using "cmux" to handle multiplexing. This only works if we have |
| // only HTTP/1.1 and gRPC on the same port. If we have gRPC and HTTP2, clients (envoy) may |
| // multiplex the connections. cmux works at the connection level, so if the first request is |
| // gRPC then all future non-GRPC HTTP2 requests will match the gRPC server and fail. The major |
| // downside of multiplexing by using gRPC's ServeHTTP is that we are using the golang HTTP2 |
| // stack. This means a lot of features on the gRPC server (keepalives, etc) do not apply. |
| multiplexHandler := h2c.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
| // If we detect gRPC, serve using grpcServer |
| if r.ProtoMajor == 2 && strings.HasPrefix(r.Header.Get("content-type"), "application/grpc") { |
| s.grpcServer.ServeHTTP(w, r) |
| return |
| } |
| // Otherwise, this is meant for the standard HTTP server |
| s.httpMux.ServeHTTP(w, r) |
| }), h2s) |
| s.httpServer.Handler = multiplexHandler |
| } |
| |
| // At this point we are ready - start Http Listener so that it can respond to readiness events. |
| httpListener, err := net.Listen("tcp", s.httpServer.Addr) |
| if err != nil { |
| return err |
| } |
| go func() { |
| log.Infof("starting HTTP service at %s", httpListener.Addr()) |
| if err := s.httpServer.Serve(httpListener); isUnexpectedListenerError(err) { |
| log.Errorf("error serving http server: %v", err) |
| } |
| }() |
| |
| if s.httpsServer != nil { |
| httpsListener, err := net.Listen("tcp", s.httpsServer.Addr) |
| if err != nil { |
| return err |
| } |
| go func() { |
| log.Infof("starting webhook service at %s", httpsListener.Addr()) |
| if err := s.httpsServer.ServeTLS(httpsListener, "", ""); isUnexpectedListenerError(err) { |
| log.Errorf("error serving https server: %v", err) |
| } |
| }() |
| } |
| |
| s.waitForShutdown(stop) |
| |
| return nil |
| } |
| |
| // WaitUntilCompletion waits for everything marked as a "required termination" to complete. |
| // This should be called before exiting. |
| func (s *Server) WaitUntilCompletion() { |
| s.server.Wait() |
| } |
| |
| // initSDSServer starts the SDS server |
| func (s *Server) initSDSServer() { |
| if s.kubeClient == nil { |
| return |
| } |
| if !features.EnableXDSIdentityCheck { |
| // Make sure we have security |
| log.Warnf("skipping Kubernetes credential reader; PILOT_ENABLE_XDS_IDENTITY_CHECK must be set to true for this feature.") |
| } else { |
| creds := kubecredentials.NewMulticluster(s.clusterID) |
| creds.AddSecretHandler(func(name string, namespace string) { |
| s.XDSServer.ConfigUpdate(&model.PushRequest{ |
| Full: false, |
| ConfigsUpdated: map[model.ConfigKey]struct{}{ |
| { |
| Kind: gvk.Secret, |
| Name: name, |
| Namespace: namespace, |
| }: {}, |
| }, |
| Reason: []model.TriggerReason{model.SecretTrigger}, |
| }) |
| }) |
| s.XDSServer.Generators[v3.SecretType] = xds.NewSecretGen(creds, s.XDSServer.Cache, s.clusterID, s.environment.Mesh()) |
| s.multiclusterController.AddHandler(creds) |
| if ecdsGen, found := s.XDSServer.Generators[v3.ExtensionConfigurationType]; found { |
| ecdsGen.(*xds.EcdsGenerator).SetCredController(creds) |
| } |
| } |
| } |
| |
| // initKubeClient creates the k8s client if running in an k8s environment. |
| // This is determined by the presence of a kube registry, which |
| // uses in-context k8s, or a config source of type k8s. |
| func (s *Server) initKubeClient(args *PilotArgs) error { |
| if s.kubeClient != nil { |
| // Already initialized by startup arguments |
| return nil |
| } |
| hasK8SConfigStore := false |
| if args.RegistryOptions.FileDir == "" { |
| // If file dir is set - config controller will just use file. |
| if _, err := os.Stat(args.MeshConfigFile); !os.IsNotExist(err) { |
| meshConfig, err := mesh.ReadMeshConfig(args.MeshConfigFile) |
| if err != nil { |
| return fmt.Errorf("failed reading mesh config: %v", err) |
| } |
| if len(meshConfig.ConfigSources) == 0 && args.RegistryOptions.KubeConfig != "" { |
| hasK8SConfigStore = true |
| } |
| for _, cs := range meshConfig.ConfigSources { |
| if cs.Address == string(Kubernetes)+"://" { |
| hasK8SConfigStore = true |
| break |
| } |
| } |
| } else if args.RegistryOptions.KubeConfig != "" { |
| hasK8SConfigStore = true |
| } |
| } |
| |
| if hasK8SConfigStore || hasKubeRegistry(args.RegistryOptions.Registries) { |
| // Used by validation |
| kubeRestConfig, err := kubelib.DefaultRestConfig(args.RegistryOptions.KubeConfig, "", func(config *rest.Config) { |
| config.QPS = args.RegistryOptions.KubeOptions.KubernetesAPIQPS |
| config.Burst = args.RegistryOptions.KubeOptions.KubernetesAPIBurst |
| }) |
| if err != nil { |
| return fmt.Errorf("failed creating kube config: %v", err) |
| } |
| |
| s.kubeClient, err = kubelib.NewClient(kubelib.NewClientConfigForRestConfig(kubeRestConfig)) |
| if err != nil { |
| return fmt.Errorf("failed creating kube client: %v", err) |
| } |
| } |
| |
| return nil |
| } |
| |
| // A single container can't have two readiness probes. Make this readiness probe a generic one |
| // that can handle all istiod related readiness checks including webhook, gRPC etc. |
| // The "http" portion of the readiness check is satisfied by the fact we've started listening on |
| // this handler and everything has already initialized. |
| func (s *Server) istiodReadyHandler(w http.ResponseWriter, _ *http.Request) { |
| for name, fn := range s.readinessProbes { |
| if ready, err := fn(); !ready { |
| log.Warnf("%s is not ready: %v", name, err) |
| w.WriteHeader(http.StatusServiceUnavailable) |
| return |
| } |
| } |
| w.WriteHeader(http.StatusOK) |
| } |
| |
| // initIstiodAdminServer initializes monitoring, debug and readiness end points. |
| func (s *Server) initIstiodAdminServer(args *PilotArgs, whc func() map[string]string) error { |
| s.httpServer = &http.Server{ |
| Addr: args.ServerOptions.HTTPAddr, |
| Handler: s.httpMux, |
| IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout |
| ReadTimeout: 30 * time.Second, |
| } |
| |
| shouldMultiplex := args.ServerOptions.MonitoringAddr == "" |
| |
| if shouldMultiplex { |
| s.monitoringMux = s.httpMux |
| log.Info("initializing Istiod admin server multiplexed on httpAddr ", s.httpServer.Addr) |
| } else { |
| log.Info("initializing Istiod admin server") |
| } |
| |
| // Debug Server. |
| s.XDSServer.InitDebug(s.monitoringMux, s.ServiceController(), args.ServerOptions.EnableProfiling, whc) |
| |
| // Debug handlers are currently added on monitoring mux and readiness mux. |
| // If monitoring addr is empty, the mux is shared and we only add it once on the shared mux . |
| if !shouldMultiplex { |
| s.XDSServer.AddDebugHandlers(s.httpMux, nil, args.ServerOptions.EnableProfiling, whc) |
| } |
| |
| // Monitoring Server. |
| if err := s.initMonitor(args.ServerOptions.MonitoringAddr); err != nil { |
| return fmt.Errorf("error initializing monitor: %v", err) |
| } |
| |
| // Readiness Handler. |
| s.httpMux.HandleFunc("/ready", s.istiodReadyHandler) |
| |
| return nil |
| } |
| |
| // initDiscoveryService initializes discovery server on plain text port. |
| func (s *Server) initDiscoveryService(args *PilotArgs) { |
| log.Infof("starting discovery service") |
| // Implement EnvoyXdsServer grace shutdown |
| s.addStartFunc(func(stop <-chan struct{}) error { |
| log.Infof("Starting ADS server") |
| s.XDSServer.Start(stop) |
| return nil |
| }) |
| |
| // Implement ServiceNameMapping grace shutdown |
| s.addStartFunc(func(stop <-chan struct{}) error { |
| log.Infof("Starting SNP server") |
| s.snpServer.Start(stop) |
| return nil |
| }) |
| |
| // Implement ServiceNameMapping grace shutdown |
| s.addStartFunc(func(stop <-chan struct{}) error { |
| log.Infof("Starting MetaData server") |
| s.metadataServer.Start(stop) |
| return nil |
| }) |
| |
| s.initGrpcServer(args.KeepaliveOptions) |
| |
| if args.ServerOptions.GRPCAddr != "" { |
| s.grpcAddress = args.ServerOptions.GRPCAddr |
| } else { |
| // This happens only if the GRPC port (15010) is disabled. We will multiplex |
| // it on the HTTP port. Does not impact the HTTPS gRPC or HTTPS. |
| log.Info("multiplexing gRPC on http addr ", args.ServerOptions.HTTPAddr) |
| s.MultiplexGRPC = true |
| } |
| } |
| |
| // Wait for the stop, and do cleanups |
| func (s *Server) waitForShutdown(stop <-chan struct{}) { |
| go func() { |
| <-stop |
| close(s.internalStop) |
| _ = s.fileWatcher.Close() |
| |
| if s.cacertsWatcher != nil { |
| _ = s.cacertsWatcher.Close() |
| } |
| // Stop gRPC services. If gRPC services fail to stop in the shutdown duration, |
| // force stop them. This does not happen normally. |
| stopped := make(chan struct{}) |
| go func() { |
| // Some grpcServer implementations do not support GracefulStop. Unfortunately, this is not |
| // exposed; they just panic. To avoid this, we will recover and do a standard Stop when its not |
| // support. |
| defer func() { |
| if r := recover(); r != nil { |
| s.grpcServer.Stop() |
| if s.secureGrpcServer != nil { |
| s.secureGrpcServer.Stop() |
| } |
| close(stopped) |
| } |
| }() |
| s.grpcServer.GracefulStop() |
| if s.secureGrpcServer != nil { |
| s.secureGrpcServer.GracefulStop() |
| } |
| close(stopped) |
| }() |
| |
| t := time.NewTimer(s.shutdownDuration) |
| select { |
| case <-t.C: |
| s.grpcServer.Stop() |
| if s.secureGrpcServer != nil { |
| s.secureGrpcServer.Stop() |
| } |
| case <-stopped: |
| t.Stop() |
| } |
| |
| // Stop HTTP services. |
| ctx, cancel := context.WithTimeout(context.Background(), s.shutdownDuration) |
| defer cancel() |
| if err := s.httpServer.Shutdown(ctx); err != nil { |
| log.Warn(err) |
| } |
| if s.httpsServer != nil { |
| if err := s.httpsServer.Shutdown(ctx); err != nil { |
| log.Warn(err) |
| } |
| } |
| |
| // Shutdown the DiscoveryServer. |
| s.XDSServer.Shutdown() |
| }() |
| } |
| |
| func (s *Server) initGrpcServer(options *istiokeepalive.Options) { |
| interceptors := []grpc.UnaryServerInterceptor{ |
| // setup server prometheus monitoring (as final interceptor in chain) |
| prometheus.UnaryServerInterceptor, |
| } |
| grpcOptions := istiogrpc.ServerOptions(options, interceptors...) |
| s.grpcServer = grpc.NewServer(grpcOptions...) |
| s.XDSServer.Register(s.grpcServer) |
| reflection.Register(s.grpcServer) |
| s.metadataServer.Register(s.grpcServer) |
| s.snpServer.Register(s.grpcServer) |
| } |
| |
| // initialize secureGRPCServer. |
| func (s *Server) initSecureDiscoveryService(args *PilotArgs) error { |
| if args.ServerOptions.SecureGRPCAddr == "" { |
| log.Info("The secure discovery port is disabled, multiplexing on httpAddr ") |
| return nil |
| } |
| |
| peerCertVerifier, err := s.createPeerCertVerifier(args.ServerOptions.TLSOptions) |
| if err != nil { |
| return err |
| } |
| if peerCertVerifier == nil { |
| // Running locally without configured certs - no TLS mode |
| log.Warnf("The secure discovery service is disabled") |
| return nil |
| } |
| log.Info("initializing secure discovery service") |
| cfg := &tls.Config{ |
| GetCertificate: s.getIstiodCertificate, |
| ClientAuth: tls.VerifyClientCertIfGiven, |
| ClientCAs: peerCertVerifier.GetGeneralCertPool(), |
| VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error { |
| err := peerCertVerifier.VerifyPeerCert(rawCerts, verifiedChains) |
| if err != nil { |
| log.Infof("Could not verify certificate: %v", err) |
| } |
| return err |
| }, |
| MinVersion: tls.VersionTLS12, |
| CipherSuites: args.ServerOptions.TLSOptions.CipherSuits, |
| } |
| |
| tlsCreds := credentials.NewTLS(cfg) |
| |
| s.secureGrpcAddress = args.ServerOptions.SecureGRPCAddr |
| |
| interceptors := []grpc.UnaryServerInterceptor{ |
| // setup server prometheus monitoring (as final interceptor in chain) |
| prometheus.UnaryServerInterceptor, |
| } |
| opts := istiogrpc.ServerOptions(args.KeepaliveOptions, interceptors...) |
| opts = append(opts, grpc.Creds(tlsCreds)) |
| |
| s.secureGrpcServer = grpc.NewServer(opts...) |
| s.XDSServer.Register(s.secureGrpcServer) |
| reflection.Register(s.secureGrpcServer) |
| s.metadataServer.Register(s.secureGrpcServer) |
| s.snpServer.Register(s.secureGrpcServer) |
| |
| s.addStartFunc(func(stop <-chan struct{}) error { |
| go func() { |
| <-stop |
| s.secureGrpcServer.Stop() |
| }() |
| return nil |
| }) |
| |
| return nil |
| } |
| |
| // addStartFunc appends a function to be run. These are run synchronously in order, |
| // so the function should start a go routine if it needs to do anything blocking |
| func (s *Server) addStartFunc(fn server.Component) { |
| s.server.RunComponent(fn) |
| } |
| |
| // adds a readiness probe for Istiod Server. |
| func (s *Server) addReadinessProbe(name string, fn readinessProbe) { |
| s.readinessProbes[name] = fn |
| } |
| |
| // addTerminatingStartFunc adds a function that should terminate before the serve shuts down |
| // This is useful to do cleanup activities |
| // This is does not guarantee they will terminate gracefully - best effort only |
| // Function should be synchronous; once it returns it is considered "done" |
| func (s *Server) addTerminatingStartFunc(fn server.Component) { |
| s.server.RunComponentAsyncAndWait(fn) |
| } |
| |
| func (s *Server) waitForCacheSync(stop <-chan struct{}) bool { |
| start := time.Now() |
| log.Info("Waiting for caches to be synced") |
| if !cache.WaitForCacheSync(stop, s.cachesSynced) { |
| log.Errorf("Failed waiting for cache sync") |
| return false |
| } |
| log.Infof("All controller caches have been synced up in %v", time.Since(start)) |
| |
| // At this point, we know that all update events of the initial state-of-the-world have been |
| // received. We wait to ensure we have committed at least this many updates. This avoids a race |
| // condition where we are marked ready prior to updating the push context, leading to incomplete |
| // pushes. |
| expected := s.XDSServer.InboundUpdates.Load() |
| if !cache.WaitForCacheSync(stop, func() bool { return s.pushContextReady(expected) }) { |
| log.Errorf("Failed waiting for push context initialization") |
| return false |
| } |
| |
| return true |
| } |
| |
| // pushContextReady indicates whether pushcontext has processed all inbound config updates. |
| func (s *Server) pushContextReady(expected int64) bool { |
| committed := s.XDSServer.CommittedUpdates.Load() |
| if committed < expected { |
| log.Debugf("Waiting for pushcontext to process inbound updates, inbound: %v, committed : %v", expected, committed) |
| return false |
| } |
| return true |
| } |
| |
| // cachesSynced checks whether caches have been synced. |
| func (s *Server) cachesSynced() bool { |
| if s.multiclusterController != nil && !s.multiclusterController.HasSynced() { |
| return false |
| } |
| if !s.ServiceController().HasSynced() { |
| return false |
| } |
| if !s.configController.HasSynced() { |
| return false |
| } |
| return true |
| } |
| |
| // initRegistryEventHandlers sets up event handlers for config and service updates |
| func (s *Server) initRegistryEventHandlers() { |
| log.Info("initializing registry event handlers") |
| // Flush cached discovery responses whenever services configuration change. |
| serviceHandler := func(svc *model.Service, _ model.Event) { |
| pushReq := &model.PushRequest{ |
| Full: true, |
| ConfigsUpdated: map[model.ConfigKey]struct{}{{ |
| Kind: gvk.ServiceEntry, |
| Name: string(svc.Hostname), |
| Namespace: svc.Attributes.Namespace, |
| }: {}}, |
| Reason: []model.TriggerReason{model.ServiceUpdate}, |
| } |
| s.XDSServer.ConfigUpdate(pushReq) |
| } |
| s.ServiceController().AppendServiceHandler(serviceHandler) |
| |
| if s.configController != nil { |
| configHandler := func(prev config.Config, curr config.Config, event model.Event) { |
| defer func() { |
| if event != model.EventDelete { |
| s.statusReporter.AddInProgressResource(curr) |
| } else { |
| s.statusReporter.DeleteInProgressResource(curr) |
| } |
| }() |
| // For update events, trigger push only if spec has changed. |
| if event == model.EventUpdate && !needsPush(prev, curr) { |
| log.Debugf("skipping push for %s as spec has not changed", prev.Key()) |
| return |
| } |
| pushReq := &model.PushRequest{ |
| Full: true, |
| ConfigsUpdated: map[model.ConfigKey]struct{}{{ |
| Kind: curr.GroupVersionKind, |
| Name: curr.Name, |
| Namespace: curr.Namespace, |
| }: {}}, |
| Reason: []model.TriggerReason{model.ConfigUpdate}, |
| } |
| s.XDSServer.ConfigUpdate(pushReq) |
| } |
| schemas := collections.Pilot.All() |
| if features.EnableGatewayAPI { |
| schemas = collections.PilotGatewayAPI.All() |
| } |
| for _, schema := range schemas { |
| // This resource type was handled in external/servicediscovery.go, no need to rehandle here. |
| if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Serviceentries. |
| Resource().GroupVersionKind() { |
| continue |
| } |
| if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Workloadentries. |
| Resource().GroupVersionKind() { |
| continue |
| } |
| if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Workloadgroups. |
| Resource().GroupVersionKind() { |
| continue |
| } |
| |
| s.configController.RegisterEventHandler(schema.Resource().GroupVersionKind(), configHandler) |
| } |
| if s.environment.GatewayAPIController != nil { |
| s.environment.GatewayAPIController.RegisterEventHandler(gvk.Namespace, func(config.Config, config.Config, model.Event) { |
| s.XDSServer.ConfigUpdate(&model.PushRequest{ |
| Full: true, |
| Reason: []model.TriggerReason{model.NamespaceUpdate}, |
| }) |
| }) |
| } |
| } |
| } |
| |
| func (s *Server) initIstiodCertLoader() error { |
| if err := s.loadIstiodCert(); err != nil { |
| return fmt.Errorf("first time load IstiodCert failed: %v", err) |
| } |
| _, watchCh := s.istiodCertBundleWatcher.AddWatcher() |
| s.addStartFunc(func(stop <-chan struct{}) error { |
| go s.reloadIstiodCert(watchCh, stop) |
| return nil |
| }) |
| return nil |
| } |
| |
| // initIstiodCerts creates Istiod certificates and also sets up watches to them. |
| func (s *Server) initIstiodCerts(args *PilotArgs, host string) error { |
| // Skip all certificates |
| var err error |
| |
| // Append custom hostname if there is any |
| customHost := features.IstiodServiceCustomHost |
| s.dnsNames = []string{host} |
| cHosts := strings.Split(customHost, ",") |
| for _, cHost := range cHosts { |
| if cHost != "" && cHost != host { |
| log.Infof("Adding custom hostname %s", cHost) |
| s.dnsNames = append(s.dnsNames, cHost) |
| } |
| } |
| |
| // The first is the recommended one, also used by Apiserver for webhooks. |
| // add a few known hostnames |
| knownHosts := []string{"istiod", "istiod-remote", "istio-pilot"} |
| // In some conditions, pilot address for sds is different from other xds, |
| // like multi-cluster primary-remote mode with revision. |
| if args.Revision != "" && args.Revision != "dubbo" { |
| knownHosts = append(knownHosts, "istiod"+"-"+args.Revision) |
| } |
| |
| for _, altName := range knownHosts { |
| name := fmt.Sprintf("%v.%v.svc", altName, args.Namespace) |
| exist := false |
| for _, cHost := range cHosts { |
| if name == host || name == cHost { |
| exist = true |
| } |
| } |
| if !exist { |
| s.dnsNames = append(s.dnsNames, name) |
| } |
| } |
| |
| if hasCustomTLSCerts(args.ServerOptions.TLSOptions) { |
| // Use the DNS certificate provided via args. |
| err = s.initCertificateWatches(args.ServerOptions.TLSOptions) |
| if err != nil { |
| // Not crashing istiod - This typically happens if certs are missing and in tests. |
| log.Errorf("error initializing certificate watches: %v", err) |
| return nil |
| } |
| err = s.initIstiodCertLoader() |
| } else if features.PilotCertProvider == constants.CertProviderNone { |
| return nil |
| } else if s.EnableCA() && features.PilotCertProvider == constants.CertProviderIstiod { |
| log.Infof("initializing Istiod DNS certificates host: %s, custom host: %s", host, features.IstiodServiceCustomHost) |
| err = s.initDNSCerts(host, args.Namespace) |
| if err == nil { |
| err = s.initIstiodCertLoader() |
| } |
| } else if features.PilotCertProvider == constants.CertProviderKubernetes { |
| log.Infof("initializing Istiod DNS certificates host: %s, custom host: %s", host, features.IstiodServiceCustomHost) |
| err = s.initDNSCerts(host, args.Namespace) |
| if err == nil { |
| err = s.initIstiodCertLoader() |
| } |
| } else if strings.HasPrefix(features.PilotCertProvider, constants.CertProviderKubernetesSignerPrefix) { |
| log.Infof("initializing Istiod DNS certificates host: %s, custom host: %s", host, features.IstiodServiceCustomHost) |
| err = s.initDNSCerts(host, args.Namespace) |
| if err == nil { |
| err = s.initIstiodCertLoader() |
| } |
| } |
| |
| return err |
| } |
| |
| // createPeerCertVerifier creates a SPIFFE certificate verifier with the current istiod configuration. |
| func (s *Server) createPeerCertVerifier(tlsOptions TLSOptions) (*spiffe.PeerCertVerifier, error) { |
| if tlsOptions.CaCertFile == "" && s.CA == nil && features.SpiffeBundleEndpoints == "" && !s.isDisableCa() { |
| // Running locally without configured certs - no TLS mode |
| return nil, nil |
| } |
| peerCertVerifier := spiffe.NewPeerCertVerifier() |
| var rootCertBytes []byte |
| var err error |
| if tlsOptions.CaCertFile != "" { |
| if rootCertBytes, err = os.ReadFile(tlsOptions.CaCertFile); err != nil { |
| return nil, err |
| } |
| } else { |
| if s.RA != nil { |
| if strings.HasPrefix(features.PilotCertProvider, constants.CertProviderKubernetesSignerPrefix) { |
| signerName := strings.TrimPrefix(features.PilotCertProvider, constants.CertProviderKubernetesSignerPrefix) |
| caBundle, _ := s.RA.GetRootCertFromMeshConfig(signerName) |
| rootCertBytes = append(rootCertBytes, caBundle...) |
| } else { |
| rootCertBytes = append(rootCertBytes, s.RA.GetCAKeyCertBundle().GetRootCertPem()...) |
| } |
| } |
| if s.CA != nil { |
| rootCertBytes = append(rootCertBytes, s.CA.GetCAKeyCertBundle().GetRootCertPem()...) |
| } |
| } |
| |
| if len(rootCertBytes) != 0 { |
| err := peerCertVerifier.AddMappingFromPEM(spiffe.GetTrustDomain(), rootCertBytes) |
| if err != nil { |
| return nil, fmt.Errorf("add root CAs into peerCertVerifier failed: %v", err) |
| } |
| } |
| |
| if features.SpiffeBundleEndpoints != "" { |
| certMap, err := spiffe.RetrieveSpiffeBundleRootCertsFromStringInput( |
| features.SpiffeBundleEndpoints, []*x509.Certificate{}) |
| if err != nil { |
| return nil, err |
| } |
| peerCertVerifier.AddMappings(certMap) |
| } |
| |
| return peerCertVerifier, nil |
| } |
| |
| // hasCustomTLSCerts returns true if custom TLS certificates are configured via args. |
| func hasCustomTLSCerts(tlsOptions TLSOptions) bool { |
| return tlsOptions.CaCertFile != "" && tlsOptions.CertFile != "" && tlsOptions.KeyFile != "" |
| } |
| |
| // getIstiodCertificate returns the istiod certificate. |
| func (s *Server) getIstiodCertificate(*tls.ClientHelloInfo) (*tls.Certificate, error) { |
| s.certMu.RLock() |
| defer s.certMu.RUnlock() |
| if s.istiodCert != nil { |
| return s.istiodCert, nil |
| } |
| return nil, fmt.Errorf("cert not initialized") |
| } |
| |
| // initControllers initializes the controllers. |
| func (s *Server) initControllers(args *PilotArgs) error { |
| log.Info("initializing controllers") |
| s.initMulticluster(args) |
| // Certificate controller is created before MCP controller in case MCP server pod |
| // waits to mount a certificate to be provisioned by the certificate controller. |
| if err := s.initCertController(args); err != nil { |
| return fmt.Errorf("error initializing certificate controller: %v", err) |
| } |
| if err := s.initConfigController(args); err != nil { |
| return fmt.Errorf("error initializing config controller: %v", err) |
| } |
| if err := s.initServiceControllers(args); err != nil { |
| return fmt.Errorf("error initializing service controllers: %v", err) |
| } |
| return nil |
| } |
| |
| func (s *Server) initMulticluster(args *PilotArgs) { |
| if s.kubeClient == nil { |
| return |
| } |
| s.multiclusterController = multicluster.NewController(s.kubeClient, args.Namespace, s.clusterID) |
| s.XDSServer.ListRemoteClusters = s.multiclusterController.ListRemoteClusters |
| s.addStartFunc(func(stop <-chan struct{}) error { |
| return s.multiclusterController.Run(stop) |
| }) |
| } |
| |
| // maybeCreateCA creates and initializes CA Key if needed. |
| func (s *Server) maybeCreateCA(caOpts *caOptions) error { |
| // CA signing certificate must be created only if CA is enabled. |
| if s.EnableCA() { |
| log.Info("creating CA and initializing public key") |
| var err error |
| var corev1 v1.CoreV1Interface |
| if s.kubeClient != nil { |
| corev1 = s.kubeClient.CoreV1() |
| } |
| if useRemoteCerts.Get() { |
| if err = s.loadRemoteCACerts(caOpts, LocalCertDir.Get()); err != nil { |
| return fmt.Errorf("failed to load remote CA certs: %v", err) |
| } |
| } |
| // May return nil, if the CA is missing required configs - This is not an error. |
| if caOpts.ExternalCAType != "" { |
| if s.RA, err = s.createIstioRA(s.kubeClient, caOpts); err != nil { |
| return fmt.Errorf("failed to create RA: %v", err) |
| } |
| } |
| if !s.isDisableCa() { |
| if s.CA, err = s.createIstioCA(corev1, caOpts); err != nil { |
| return fmt.Errorf("failed to create CA: %v", err) |
| } |
| } |
| |
| } |
| return nil |
| } |
| |
| func (s *Server) shouldStartNsController() bool { |
| if s.isDisableCa() { |
| return true |
| } |
| if s.CA == nil { |
| return false |
| } |
| |
| // For Kubernetes CA, we don't distribute it; it is mounted in all pods by Kubernetes. |
| if features.PilotCertProvider == constants.CertProviderKubernetes { |
| return false |
| } |
| // For no CA we don't distribute it either, as there is no cert |
| if features.PilotCertProvider == constants.CertProviderNone { |
| return false |
| } |
| |
| return true |
| } |
| |
| // StartCA starts the CA or RA server if configured. |
| func (s *Server) startCA(caOpts *caOptions) { |
| if s.CA == nil && s.RA == nil { |
| return |
| } |
| s.addStartFunc(func(stop <-chan struct{}) error { |
| grpcServer := s.secureGrpcServer |
| if s.secureGrpcServer == nil { |
| grpcServer = s.grpcServer |
| } |
| // Start the RA server if configured, else start the CA server |
| if s.RA != nil { |
| log.Infof("Starting RA") |
| s.RunCA(grpcServer, s.RA, caOpts) |
| } else if s.CA != nil { |
| log.Infof("Starting IstioD CA") |
| s.RunCA(grpcServer, s.CA, caOpts) |
| } |
| return nil |
| }) |
| } |
| |
| // initMeshHandlers initializes mesh and network handlers. |
| func (s *Server) initMeshHandlers() { |
| log.Info("initializing mesh handlers") |
| // When the mesh config or networks change, do a full push. |
| s.environment.AddMeshHandler(func() { |
| spiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain()) |
| s.XDSServer.ConfigGenerator.MeshConfigChanged(s.environment.Mesh()) |
| s.XDSServer.ConfigUpdate(&model.PushRequest{ |
| Full: true, |
| Reason: []model.TriggerReason{model.GlobalUpdate}, |
| }) |
| }) |
| } |
| |
| func (s *Server) addIstioCAToTrustBundle(args *PilotArgs) error { |
| var err error |
| if s.CA != nil { |
| // If IstioCA is setup, derive trustAnchor directly from CA |
| rootCerts := []string{string(s.CA.GetCAKeyCertBundle().GetRootCertPem())} |
| err = s.workloadTrustBundle.UpdateTrustAnchor(&tb.TrustAnchorUpdate{ |
| TrustAnchorConfig: tb.TrustAnchorConfig{Certs: rootCerts}, |
| Source: tb.SourceIstioCA, |
| }) |
| if err != nil { |
| log.Errorf("unable to add CA root from namespace %s as trustAnchor", args.Namespace) |
| return err |
| } |
| return nil |
| } |
| return nil |
| } |
| |
| func (s *Server) initWorkloadTrustBundle(args *PilotArgs) error { |
| var err error |
| |
| if !features.MultiRootMesh { |
| return nil |
| } |
| |
| s.workloadTrustBundle.UpdateCb(func() { |
| pushReq := &model.PushRequest{ |
| Full: true, |
| Reason: []model.TriggerReason{model.GlobalUpdate}, |
| } |
| s.XDSServer.ConfigUpdate(pushReq) |
| }) |
| |
| s.addStartFunc(func(stop <-chan struct{}) error { |
| go s.workloadTrustBundle.ProcessRemoteTrustAnchors(stop, tb.RemoteDefaultPollPeriod) |
| return nil |
| }) |
| |
| // MeshConfig: Add initial roots |
| err = s.workloadTrustBundle.AddMeshConfigUpdate(s.environment.Mesh()) |
| if err != nil { |
| return err |
| } |
| |
| // MeshConfig:Add callback for mesh config update |
| s.environment.AddMeshHandler(func() { |
| _ = s.workloadTrustBundle.AddMeshConfigUpdate(s.environment.Mesh()) |
| }) |
| |
| err = s.addIstioCAToTrustBundle(args) |
| if err != nil { |
| return err |
| } |
| |
| // IstioRA: Explicitly add roots corresponding to RA |
| if s.RA != nil { |
| // Implicitly add the Istio RA certificates to the Workload Trust Bundle |
| rootCerts := []string{string(s.RA.GetCAKeyCertBundle().GetRootCertPem())} |
| err = s.workloadTrustBundle.UpdateTrustAnchor(&tb.TrustAnchorUpdate{ |
| TrustAnchorConfig: tb.TrustAnchorConfig{Certs: rootCerts}, |
| Source: tb.SourceIstioRA, |
| }) |
| if err != nil { |
| log.Errorf("fatal: unable to add RA root as trustAnchor") |
| return err |
| } |
| } |
| log.Infof("done initializing workload trustBundle") |
| return nil |
| } |
| |
| // isDisableCa returns whether CA functionality is disabled in istiod. |
| // It return true only if istiod certs is signed by Kubernetes and |
| // workload certs are signed by external CA |
| func (s *Server) isDisableCa() bool { |
| if s.RA != nil { |
| // do not create CA server if PilotCertProvider is `kubernetes` and RA server exists |
| if features.PilotCertProvider == constants.CertProviderKubernetes { |
| return true |
| } |
| // do not create CA server if PilotCertProvider is `k8s.io/*` and RA server exists |
| if strings.HasPrefix(features.PilotCertProvider, constants.CertProviderKubernetesSignerPrefix) { |
| return true |
| } |
| } |
| return false |
| } |
| |
| func (s *Server) initStatusManager(_ *PilotArgs) { |
| s.addStartFunc(func(stop <-chan struct{}) error { |
| s.statusManager = status.NewManager(s.RWConfigStore) |
| s.statusManager.Start(stop) |
| return nil |
| }) |
| } |