blob: cc1cec1c17e68ca83b487fa0d3ba3d9eadd360b5 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bootstrap
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"os"
"strings"
"sync"
"time"
)
import (
"github.com/fsnotify/fsnotify"
prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
prom "github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/reflection"
"istio.io/api/security/v1beta1"
"istio.io/pkg/ctrlz"
"istio.io/pkg/filewatcher"
"istio.io/pkg/log"
"istio.io/pkg/version"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
import (
kubecredentials "github.com/apache/dubbo-go-pixiu/pilot/pkg/credentials/kube"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
istiogrpc "github.com/apache/dubbo-go-pixiu/pilot/pkg/grpc"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/keycertbundle"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
dubbov1alpha1 "github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/dubbo/v1alpha1"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/server"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/aggregate"
kubecontroller "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube/controller"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/provider"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/serviceentry"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/status"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/status/distribution"
tb "github.com/apache/dubbo-go-pixiu/pilot/pkg/trustbundle"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/xds"
v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3"
"github.com/apache/dubbo-go-pixiu/pkg/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/config/constants"
"github.com/apache/dubbo-go-pixiu/pkg/config/mesh"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/collections"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk"
istiokeepalive "github.com/apache/dubbo-go-pixiu/pkg/keepalive"
kubelib "github.com/apache/dubbo-go-pixiu/pkg/kube"
"github.com/apache/dubbo-go-pixiu/pkg/kube/inject"
"github.com/apache/dubbo-go-pixiu/pkg/kube/multicluster"
"github.com/apache/dubbo-go-pixiu/pkg/security"
"github.com/apache/dubbo-go-pixiu/pkg/spiffe"
"github.com/apache/dubbo-go-pixiu/security/pkg/k8s/chiron"
"github.com/apache/dubbo-go-pixiu/security/pkg/pki/ca"
"github.com/apache/dubbo-go-pixiu/security/pkg/pki/ra"
"github.com/apache/dubbo-go-pixiu/security/pkg/server/ca/authenticate"
"github.com/apache/dubbo-go-pixiu/security/pkg/server/ca/authenticate/kubeauth"
)
const (
// debounce file watcher events to minimize noise in logs
watchDebounceDelay = 100 * time.Millisecond
)
func init() {
// Disable gRPC tracing. It has performance impacts (See https://github.com/grpc/grpc-go/issues/695)
grpc.EnableTracing = false
// Export pilot version as metric for fleet analytics.
pilotVersion := prom.NewGaugeVec(prom.GaugeOpts{
Name: "pilot_info",
Help: "Pilot version and build information.",
}, []string{"version"})
prom.MustRegister(pilotVersion)
pilotVersion.With(prom.Labels{"version": version.Info.String()}).Set(1)
}
// readinessProbe defines a function that will be used indicate whether a server is ready.
type readinessProbe func() (bool, error)
// Server contains the runtime configuration for the Pilot discovery service.
type Server struct {
XDSServer *xds.DiscoveryServer
clusterID cluster.ID
environment *model.Environment
kubeClient kubelib.Client
multiclusterController *multicluster.Controller
configController model.ConfigStoreController
ConfigStores []model.ConfigStoreController
serviceEntryController *serviceentry.Controller
httpServer *http.Server // debug, monitoring and readiness Server.
httpsServer *http.Server // webhooks HTTPS Server.
httpsReadyClient *http.Client
grpcServer *grpc.Server
grpcAddress string
secureGrpcServer *grpc.Server
secureGrpcAddress string
// monitoringMux listens on monitoringAddr(:15014).
// Currently runs prometheus monitoring and debug (if enabled).
monitoringMux *http.ServeMux
// httpMux listens on the httpAddr (8080).
// If a Gateway is used in front and https is off it is also multiplexing
// the rest of the features if their port is empty.
// Currently runs readiness and debug (if enabled)
httpMux *http.ServeMux
// httpsMux listens on the httpsAddr(15017), handling webhooks
// If the address os empty, the webhooks will be set on the default httpPort.
httpsMux *http.ServeMux // webhooks
// MultiplexGRPC will serve gRPC and HTTP (1 or 2) over the HTTPListener, if enabled.
MultiplexGRPC bool
// fileWatcher used to watch mesh config, networks and certificates.
fileWatcher filewatcher.FileWatcher
// certWatcher watches the certificates for changes and triggers a notification to Istiod.
cacertsWatcher *fsnotify.Watcher
dnsNames []string
certController *chiron.WebhookController
CA *ca.IstioCA
RA ra.RegistrationAuthority
// TrustAnchors for workload to workload mTLS
workloadTrustBundle *tb.TrustBundle
certMu sync.RWMutex
istiodCert *tls.Certificate
istiodCertBundleWatcher *keycertbundle.Watcher
server server.Instance
readinessProbes map[string]readinessProbe
// duration used for graceful shutdown.
shutdownDuration time.Duration
// internalStop is closed when the server is shutdown. This should be avoided as much as possible, in
// favor of AddStartFunc. This is only required if we *must* start something outside of this process.
// For example, everything depends on mesh config, so we use it there rather than trying to sequence everything
// in AddStartFunc
internalStop chan struct{}
statusReporter *distribution.Reporter
statusManager *status.Manager
// RWConfigStore is the configstore which allows updates, particularly for status.
RWConfigStore model.ConfigStoreController
// ServiceMetadataServer
metadataServer *dubbov1alpha1.ServiceMetadataServer
//Service Name mapping register
snpServer *dubbov1alpha1.Snp
}
// NewServer creates a new Server instance based on the provided arguments.
func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) {
e := &model.Environment{
PushContext: model.NewPushContext(),
DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix,
}
e.SetLedger(buildLedger(args.RegistryOptions))
ac := aggregate.NewController(aggregate.Options{
MeshHolder: e,
})
e.ServiceDiscovery = ac
s := &Server{
clusterID: getClusterID(args),
environment: e,
fileWatcher: filewatcher.NewWatcher(),
httpMux: http.NewServeMux(),
monitoringMux: http.NewServeMux(),
readinessProbes: make(map[string]readinessProbe),
workloadTrustBundle: tb.NewTrustBundle(nil),
server: server.New(),
shutdownDuration: args.ShutdownDuration,
internalStop: make(chan struct{}),
istiodCertBundleWatcher: keycertbundle.NewWatcher(),
}
// Apply custom initialization functions.
for _, fn := range initFuncs {
fn(s)
}
// Initialize workload Trust Bundle before XDS Server
e.TrustBundle = s.workloadTrustBundle
s.XDSServer = xds.NewDiscoveryServer(e, args.PodName, args.RegistryOptions.KubeOptions.ClusterAliases)
prometheus.EnableHandlingTimeHistogram()
// Apply the arguments to the configuration.
if err := s.initKubeClient(args); err != nil {
return nil, fmt.Errorf("error initializing kube client: %v", err)
}
// used for both initKubeRegistry and initClusterRegistries
args.RegistryOptions.KubeOptions.EndpointMode = kubecontroller.DetectEndpointMode(s.kubeClient)
s.initMeshConfiguration(args, s.fileWatcher)
spiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain())
s.initMeshNetworks(args, s.fileWatcher)
s.initMeshHandlers()
s.environment.Init()
if err := s.environment.InitNetworksManager(s.XDSServer); err != nil {
return nil, err
}
// Options based on the current 'defaults' in istio.
caOpts := &caOptions{
TrustDomain: s.environment.Mesh().TrustDomain,
Namespace: args.Namespace,
ExternalCAType: ra.CaExternalType(externalCaType),
CertSignerDomain: features.CertSignerDomain,
}
if caOpts.ExternalCAType == ra.ExtCAK8s {
// Older environment variable preserved for backward compatibility
caOpts.ExternalCASigner = k8sSigner
}
// CA signing certificate must be created first if needed.
if err := s.maybeCreateCA(caOpts); err != nil {
return nil, err
}
if err := s.initControllers(args); err != nil {
return nil, err
}
s.XDSServer.InitGenerators(e, args.Namespace)
// Initialize workloadTrustBundle after CA has been initialized
if err := s.initWorkloadTrustBundle(args); err != nil {
return nil, err
}
// Parse and validate Istiod Address.
istiodHost, _, err := e.GetDiscoveryAddress()
if err != nil {
return nil, err
}
// Create Istiod certs and setup watches.
if err := s.initIstiodCerts(args, string(istiodHost)); err != nil {
return nil, err
}
if s.kubeClient != nil {
s.metadataServer = dubbov1alpha1.NewServiceMetadataServer(s.kubeClient)
}
// Create Service Name mapping server
if s.kubeClient != nil {
s.snpServer = dubbov1alpha1.NewSnp(s.kubeClient)
}
// Secure gRPC Server must be initialized after CA is created as may use a Citadel generated cert.
if err := s.initSecureDiscoveryService(args); err != nil {
return nil, fmt.Errorf("error initializing secure gRPC Listener: %v", err)
}
var wh *inject.Webhook
// common https server for webhooks (e.g. injection, validation)
if s.kubeClient != nil {
s.initSecureWebhookServer(args)
wh, err = s.initSidecarInjector(args)
if err != nil {
return nil, fmt.Errorf("error initializing sidecar injector: %v", err)
}
if err := s.initConfigValidation(args); err != nil {
return nil, fmt.Errorf("error initializing config validator: %v", err)
}
}
whc := func() map[string]string {
if wh != nil {
return wh.Config.RawTemplates
}
return map[string]string{}
}
// Used for readiness, monitoring and debug handlers.
if err := s.initIstiodAdminServer(args, whc); err != nil {
return nil, fmt.Errorf("error initializing debug server: %v", err)
}
// This should be called only after controllers are initialized.
s.initRegistryEventHandlers()
s.initDiscoveryService(args)
s.initSDSServer()
// Notice that the order of authenticators matters, since at runtime
// authenticators are activated sequentially and the first successful attempt
// is used as the authentication result.
authenticators := []security.Authenticator{
&authenticate.ClientCertAuthenticator{},
}
if args.JwtRule != "" {
jwtAuthn, err := initOIDC(args, s.environment.Mesh().TrustDomain)
if err != nil {
return nil, fmt.Errorf("error initializing OIDC: %v", err)
}
if jwtAuthn == nil {
return nil, fmt.Errorf("JWT authenticator is nil")
}
authenticators = append(authenticators, jwtAuthn)
}
// The k8s JWT authenticator requires the multicluster registry to be initialized,
// so we build it later.
authenticators = append(authenticators,
kubeauth.NewKubeJWTAuthenticator(s.environment.Watcher, s.kubeClient, s.clusterID, s.multiclusterController.GetRemoteKubeClient, features.JwtPolicy))
if features.XDSAuth {
s.XDSServer.Authenticators = authenticators
}
caOpts.Authenticators = authenticators
// Start CA or RA server. This should be called after CA and Istiod certs have been created.
s.startCA(caOpts)
// TODO: don't run this if galley is started, one ctlz is enough
if args.CtrlZOptions != nil {
_, _ = ctrlz.Run(args.CtrlZOptions, nil)
}
// This must be last, otherwise we will not know which informers to register
if s.kubeClient != nil {
s.addStartFunc(func(stop <-chan struct{}) error {
s.kubeClient.RunAndWait(stop)
return nil
})
}
s.addReadinessProbe("discovery", func() (bool, error) {
return s.XDSServer.IsServerReady(), nil
})
return s, nil
}
func initOIDC(args *PilotArgs, trustDomain string) (security.Authenticator, error) {
// JWTRule is from the JWT_RULE environment variable.
// An example of json string for JWTRule is:
// `{"issuer": "foo", "jwks_uri": "baz", "audiences": ["aud1", "aud2"]}`.
jwtRule := &v1beta1.JWTRule{}
err := json.Unmarshal([]byte(args.JwtRule), jwtRule)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal JWT rule: %v", err)
}
log.Infof("Istiod authenticating using JWTRule: %v", jwtRule)
jwtAuthn, err := authenticate.NewJwtAuthenticator(jwtRule, trustDomain)
if err != nil {
return nil, fmt.Errorf("failed to create the JWT authenticator: %v", err)
}
return jwtAuthn, nil
}
func getClusterID(args *PilotArgs) cluster.ID {
clusterID := args.RegistryOptions.KubeOptions.ClusterID
if clusterID == "" {
if hasKubeRegistry(args.RegistryOptions.Registries) {
clusterID = cluster.ID(provider.Kubernetes)
}
}
return clusterID
}
func isUnexpectedListenerError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, net.ErrClosed) {
return false
}
if errors.Is(err, http.ErrServerClosed) {
return false
}
return true
}
// Start starts all components of the Pilot discovery service on the port specified in DiscoveryServerOptions.
// If Port == 0, a port number is automatically chosen. Content serving is started by this method,
// but is executed asynchronously. Serving can be canceled at any time by closing the provided stop channel.
func (s *Server) Start(stop <-chan struct{}) error {
log.Infof("Starting Istiod Server with primary cluster %s", s.clusterID)
if features.UnsafeFeaturesEnabled() {
log.Warn("Server is starting with unsafe features enabled")
}
// Now start all of the components.
if err := s.server.Start(stop); err != nil {
return err
}
if !s.waitForCacheSync(stop) {
return fmt.Errorf("failed to sync cache")
}
// Inform Discovery Server so that it can start accepting connections.
s.XDSServer.CachesSynced()
// Race condition - if waitForCache is too fast and we run this as a startup function,
// the grpc server would be started before CA is registered. Listening should be last.
if s.secureGrpcAddress != "" {
grpcListener, err := net.Listen("tcp", s.secureGrpcAddress)
if err != nil {
return err
}
go func() {
log.Infof("starting secure gRPC discovery service at %s", grpcListener.Addr())
if err := s.secureGrpcServer.Serve(grpcListener); err != nil {
log.Errorf("error serving secure GRPC server: %v", err)
}
}()
}
if s.grpcAddress != "" {
grpcListener, err := net.Listen("tcp", s.grpcAddress)
if err != nil {
return err
}
go func() {
log.Infof("starting gRPC discovery service at %s", grpcListener.Addr())
if err := s.grpcServer.Serve(grpcListener); err != nil {
log.Errorf("error serving GRPC server: %v", err)
}
}()
}
if s.MultiplexGRPC {
log.Infof("multiplexing gRPC services with HTTP services")
h2s := &http2.Server{
MaxConcurrentStreams: uint32(features.MaxConcurrentStreams),
}
// In the past, we have tried using "cmux" to handle multiplexing. This only works if we have
// only HTTP/1.1 and gRPC on the same port. If we have gRPC and HTTP2, clients (envoy) may
// multiplex the connections. cmux works at the connection level, so if the first request is
// gRPC then all future non-GRPC HTTP2 requests will match the gRPC server and fail. The major
// downside of multiplexing by using gRPC's ServeHTTP is that we are using the golang HTTP2
// stack. This means a lot of features on the gRPC server (keepalives, etc) do not apply.
multiplexHandler := h2c.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// If we detect gRPC, serve using grpcServer
if r.ProtoMajor == 2 && strings.HasPrefix(r.Header.Get("content-type"), "application/grpc") {
s.grpcServer.ServeHTTP(w, r)
return
}
// Otherwise, this is meant for the standard HTTP server
s.httpMux.ServeHTTP(w, r)
}), h2s)
s.httpServer.Handler = multiplexHandler
}
// At this point we are ready - start Http Listener so that it can respond to readiness events.
httpListener, err := net.Listen("tcp", s.httpServer.Addr)
if err != nil {
return err
}
go func() {
log.Infof("starting HTTP service at %s", httpListener.Addr())
if err := s.httpServer.Serve(httpListener); isUnexpectedListenerError(err) {
log.Errorf("error serving http server: %v", err)
}
}()
if s.httpsServer != nil {
httpsListener, err := net.Listen("tcp", s.httpsServer.Addr)
if err != nil {
return err
}
go func() {
log.Infof("starting webhook service at %s", httpsListener.Addr())
if err := s.httpsServer.ServeTLS(httpsListener, "", ""); isUnexpectedListenerError(err) {
log.Errorf("error serving https server: %v", err)
}
}()
}
s.waitForShutdown(stop)
return nil
}
// WaitUntilCompletion waits for everything marked as a "required termination" to complete.
// This should be called before exiting.
func (s *Server) WaitUntilCompletion() {
s.server.Wait()
}
// initSDSServer starts the SDS server
func (s *Server) initSDSServer() {
if s.kubeClient == nil {
return
}
if !features.EnableXDSIdentityCheck {
// Make sure we have security
log.Warnf("skipping Kubernetes credential reader; PILOT_ENABLE_XDS_IDENTITY_CHECK must be set to true for this feature.")
} else {
creds := kubecredentials.NewMulticluster(s.clusterID)
creds.AddSecretHandler(func(name string, namespace string) {
s.XDSServer.ConfigUpdate(&model.PushRequest{
Full: false,
ConfigsUpdated: map[model.ConfigKey]struct{}{
{
Kind: gvk.Secret,
Name: name,
Namespace: namespace,
}: {},
},
Reason: []model.TriggerReason{model.SecretTrigger},
})
})
s.XDSServer.Generators[v3.SecretType] = xds.NewSecretGen(creds, s.XDSServer.Cache, s.clusterID, s.environment.Mesh())
s.multiclusterController.AddHandler(creds)
if ecdsGen, found := s.XDSServer.Generators[v3.ExtensionConfigurationType]; found {
ecdsGen.(*xds.EcdsGenerator).SetCredController(creds)
}
}
}
// initKubeClient creates the k8s client if running in an k8s environment.
// This is determined by the presence of a kube registry, which
// uses in-context k8s, or a config source of type k8s.
func (s *Server) initKubeClient(args *PilotArgs) error {
if s.kubeClient != nil {
// Already initialized by startup arguments
return nil
}
hasK8SConfigStore := false
if args.RegistryOptions.FileDir == "" {
// If file dir is set - config controller will just use file.
if _, err := os.Stat(args.MeshConfigFile); !os.IsNotExist(err) {
meshConfig, err := mesh.ReadMeshConfig(args.MeshConfigFile)
if err != nil {
return fmt.Errorf("failed reading mesh config: %v", err)
}
if len(meshConfig.ConfigSources) == 0 && args.RegistryOptions.KubeConfig != "" {
hasK8SConfigStore = true
}
for _, cs := range meshConfig.ConfigSources {
if cs.Address == string(Kubernetes)+"://" {
hasK8SConfigStore = true
break
}
}
} else if args.RegistryOptions.KubeConfig != "" {
hasK8SConfigStore = true
}
}
if hasK8SConfigStore || hasKubeRegistry(args.RegistryOptions.Registries) {
// Used by validation
kubeRestConfig, err := kubelib.DefaultRestConfig(args.RegistryOptions.KubeConfig, "", func(config *rest.Config) {
config.QPS = args.RegistryOptions.KubeOptions.KubernetesAPIQPS
config.Burst = args.RegistryOptions.KubeOptions.KubernetesAPIBurst
})
if err != nil {
return fmt.Errorf("failed creating kube config: %v", err)
}
s.kubeClient, err = kubelib.NewClient(kubelib.NewClientConfigForRestConfig(kubeRestConfig))
if err != nil {
return fmt.Errorf("failed creating kube client: %v", err)
}
}
return nil
}
// A single container can't have two readiness probes. Make this readiness probe a generic one
// that can handle all istiod related readiness checks including webhook, gRPC etc.
// The "http" portion of the readiness check is satisfied by the fact we've started listening on
// this handler and everything has already initialized.
func (s *Server) istiodReadyHandler(w http.ResponseWriter, _ *http.Request) {
for name, fn := range s.readinessProbes {
if ready, err := fn(); !ready {
log.Warnf("%s is not ready: %v", name, err)
w.WriteHeader(http.StatusServiceUnavailable)
return
}
}
w.WriteHeader(http.StatusOK)
}
// initIstiodAdminServer initializes monitoring, debug and readiness end points.
func (s *Server) initIstiodAdminServer(args *PilotArgs, whc func() map[string]string) error {
s.httpServer = &http.Server{
Addr: args.ServerOptions.HTTPAddr,
Handler: s.httpMux,
IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout
ReadTimeout: 30 * time.Second,
}
shouldMultiplex := args.ServerOptions.MonitoringAddr == ""
if shouldMultiplex {
s.monitoringMux = s.httpMux
log.Info("initializing Istiod admin server multiplexed on httpAddr ", s.httpServer.Addr)
} else {
log.Info("initializing Istiod admin server")
}
// Debug Server.
s.XDSServer.InitDebug(s.monitoringMux, s.ServiceController(), args.ServerOptions.EnableProfiling, whc)
// Debug handlers are currently added on monitoring mux and readiness mux.
// If monitoring addr is empty, the mux is shared and we only add it once on the shared mux .
if !shouldMultiplex {
s.XDSServer.AddDebugHandlers(s.httpMux, nil, args.ServerOptions.EnableProfiling, whc)
}
// Monitoring Server.
if err := s.initMonitor(args.ServerOptions.MonitoringAddr); err != nil {
return fmt.Errorf("error initializing monitor: %v", err)
}
// Readiness Handler.
s.httpMux.HandleFunc("/ready", s.istiodReadyHandler)
return nil
}
// initDiscoveryService initializes discovery server on plain text port.
func (s *Server) initDiscoveryService(args *PilotArgs) {
log.Infof("starting discovery service")
// Implement EnvoyXdsServer grace shutdown
s.addStartFunc(func(stop <-chan struct{}) error {
log.Infof("Starting ADS server")
s.XDSServer.Start(stop)
return nil
})
// Implement ServiceNameMapping grace shutdown
s.addStartFunc(func(stop <-chan struct{}) error {
log.Infof("Starting SNP server")
s.snpServer.Start(stop)
return nil
})
// Implement ServiceNameMapping grace shutdown
s.addStartFunc(func(stop <-chan struct{}) error {
log.Infof("Starting MetaData server")
s.metadataServer.Start(stop)
return nil
})
s.initGrpcServer(args.KeepaliveOptions)
if args.ServerOptions.GRPCAddr != "" {
s.grpcAddress = args.ServerOptions.GRPCAddr
} else {
// This happens only if the GRPC port (15010) is disabled. We will multiplex
// it on the HTTP port. Does not impact the HTTPS gRPC or HTTPS.
log.Info("multiplexing gRPC on http addr ", args.ServerOptions.HTTPAddr)
s.MultiplexGRPC = true
}
}
// Wait for the stop, and do cleanups
func (s *Server) waitForShutdown(stop <-chan struct{}) {
go func() {
<-stop
close(s.internalStop)
_ = s.fileWatcher.Close()
if s.cacertsWatcher != nil {
_ = s.cacertsWatcher.Close()
}
// Stop gRPC services. If gRPC services fail to stop in the shutdown duration,
// force stop them. This does not happen normally.
stopped := make(chan struct{})
go func() {
// Some grpcServer implementations do not support GracefulStop. Unfortunately, this is not
// exposed; they just panic. To avoid this, we will recover and do a standard Stop when its not
// support.
defer func() {
if r := recover(); r != nil {
s.grpcServer.Stop()
if s.secureGrpcServer != nil {
s.secureGrpcServer.Stop()
}
close(stopped)
}
}()
s.grpcServer.GracefulStop()
if s.secureGrpcServer != nil {
s.secureGrpcServer.GracefulStop()
}
close(stopped)
}()
t := time.NewTimer(s.shutdownDuration)
select {
case <-t.C:
s.grpcServer.Stop()
if s.secureGrpcServer != nil {
s.secureGrpcServer.Stop()
}
case <-stopped:
t.Stop()
}
// Stop HTTP services.
ctx, cancel := context.WithTimeout(context.Background(), s.shutdownDuration)
defer cancel()
if err := s.httpServer.Shutdown(ctx); err != nil {
log.Warn(err)
}
if s.httpsServer != nil {
if err := s.httpsServer.Shutdown(ctx); err != nil {
log.Warn(err)
}
}
// Shutdown the DiscoveryServer.
s.XDSServer.Shutdown()
}()
}
func (s *Server) initGrpcServer(options *istiokeepalive.Options) {
interceptors := []grpc.UnaryServerInterceptor{
// setup server prometheus monitoring (as final interceptor in chain)
prometheus.UnaryServerInterceptor,
}
grpcOptions := istiogrpc.ServerOptions(options, interceptors...)
s.grpcServer = grpc.NewServer(grpcOptions...)
s.XDSServer.Register(s.grpcServer)
reflection.Register(s.grpcServer)
s.metadataServer.Register(s.grpcServer)
s.snpServer.Register(s.grpcServer)
}
// initialize secureGRPCServer.
func (s *Server) initSecureDiscoveryService(args *PilotArgs) error {
if args.ServerOptions.SecureGRPCAddr == "" {
log.Info("The secure discovery port is disabled, multiplexing on httpAddr ")
return nil
}
peerCertVerifier, err := s.createPeerCertVerifier(args.ServerOptions.TLSOptions)
if err != nil {
return err
}
if peerCertVerifier == nil {
// Running locally without configured certs - no TLS mode
log.Warnf("The secure discovery service is disabled")
return nil
}
log.Info("initializing secure discovery service")
cfg := &tls.Config{
GetCertificate: s.getIstiodCertificate,
ClientAuth: tls.VerifyClientCertIfGiven,
ClientCAs: peerCertVerifier.GetGeneralCertPool(),
VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
err := peerCertVerifier.VerifyPeerCert(rawCerts, verifiedChains)
if err != nil {
log.Infof("Could not verify certificate: %v", err)
}
return err
},
MinVersion: tls.VersionTLS12,
CipherSuites: args.ServerOptions.TLSOptions.CipherSuits,
}
tlsCreds := credentials.NewTLS(cfg)
s.secureGrpcAddress = args.ServerOptions.SecureGRPCAddr
interceptors := []grpc.UnaryServerInterceptor{
// setup server prometheus monitoring (as final interceptor in chain)
prometheus.UnaryServerInterceptor,
}
opts := istiogrpc.ServerOptions(args.KeepaliveOptions, interceptors...)
opts = append(opts, grpc.Creds(tlsCreds))
s.secureGrpcServer = grpc.NewServer(opts...)
s.XDSServer.Register(s.secureGrpcServer)
reflection.Register(s.secureGrpcServer)
s.metadataServer.Register(s.secureGrpcServer)
s.snpServer.Register(s.secureGrpcServer)
s.addStartFunc(func(stop <-chan struct{}) error {
go func() {
<-stop
s.secureGrpcServer.Stop()
}()
return nil
})
return nil
}
// addStartFunc appends a function to be run. These are run synchronously in order,
// so the function should start a go routine if it needs to do anything blocking
func (s *Server) addStartFunc(fn server.Component) {
s.server.RunComponent(fn)
}
// adds a readiness probe for Istiod Server.
func (s *Server) addReadinessProbe(name string, fn readinessProbe) {
s.readinessProbes[name] = fn
}
// addTerminatingStartFunc adds a function that should terminate before the serve shuts down
// This is useful to do cleanup activities
// This is does not guarantee they will terminate gracefully - best effort only
// Function should be synchronous; once it returns it is considered "done"
func (s *Server) addTerminatingStartFunc(fn server.Component) {
s.server.RunComponentAsyncAndWait(fn)
}
func (s *Server) waitForCacheSync(stop <-chan struct{}) bool {
start := time.Now()
log.Info("Waiting for caches to be synced")
if !cache.WaitForCacheSync(stop, s.cachesSynced) {
log.Errorf("Failed waiting for cache sync")
return false
}
log.Infof("All controller caches have been synced up in %v", time.Since(start))
// At this point, we know that all update events of the initial state-of-the-world have been
// received. We wait to ensure we have committed at least this many updates. This avoids a race
// condition where we are marked ready prior to updating the push context, leading to incomplete
// pushes.
expected := s.XDSServer.InboundUpdates.Load()
if !cache.WaitForCacheSync(stop, func() bool { return s.pushContextReady(expected) }) {
log.Errorf("Failed waiting for push context initialization")
return false
}
return true
}
// pushContextReady indicates whether pushcontext has processed all inbound config updates.
func (s *Server) pushContextReady(expected int64) bool {
committed := s.XDSServer.CommittedUpdates.Load()
if committed < expected {
log.Debugf("Waiting for pushcontext to process inbound updates, inbound: %v, committed : %v", expected, committed)
return false
}
return true
}
// cachesSynced checks whether caches have been synced.
func (s *Server) cachesSynced() bool {
if s.multiclusterController != nil && !s.multiclusterController.HasSynced() {
return false
}
if !s.ServiceController().HasSynced() {
return false
}
if !s.configController.HasSynced() {
return false
}
return true
}
// initRegistryEventHandlers sets up event handlers for config and service updates
func (s *Server) initRegistryEventHandlers() {
log.Info("initializing registry event handlers")
// Flush cached discovery responses whenever services configuration change.
serviceHandler := func(svc *model.Service, _ model.Event) {
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: gvk.ServiceEntry,
Name: string(svc.Hostname),
Namespace: svc.Attributes.Namespace,
}: {}},
Reason: []model.TriggerReason{model.ServiceUpdate},
}
s.XDSServer.ConfigUpdate(pushReq)
}
s.ServiceController().AppendServiceHandler(serviceHandler)
if s.configController != nil {
configHandler := func(prev config.Config, curr config.Config, event model.Event) {
defer func() {
if event != model.EventDelete {
s.statusReporter.AddInProgressResource(curr)
} else {
s.statusReporter.DeleteInProgressResource(curr)
}
}()
// For update events, trigger push only if spec has changed.
if event == model.EventUpdate && !needsPush(prev, curr) {
log.Debugf("skipping push for %s as spec has not changed", prev.Key())
return
}
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: curr.GroupVersionKind,
Name: curr.Name,
Namespace: curr.Namespace,
}: {}},
Reason: []model.TriggerReason{model.ConfigUpdate},
}
s.XDSServer.ConfigUpdate(pushReq)
}
schemas := collections.Pilot.All()
if features.EnableGatewayAPI {
schemas = collections.PilotGatewayAPI.All()
}
for _, schema := range schemas {
// This resource type was handled in external/servicediscovery.go, no need to rehandle here.
if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Serviceentries.
Resource().GroupVersionKind() {
continue
}
if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Workloadentries.
Resource().GroupVersionKind() {
continue
}
if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Workloadgroups.
Resource().GroupVersionKind() {
continue
}
s.configController.RegisterEventHandler(schema.Resource().GroupVersionKind(), configHandler)
}
if s.environment.GatewayAPIController != nil {
s.environment.GatewayAPIController.RegisterEventHandler(gvk.Namespace, func(config.Config, config.Config, model.Event) {
s.XDSServer.ConfigUpdate(&model.PushRequest{
Full: true,
Reason: []model.TriggerReason{model.NamespaceUpdate},
})
})
}
}
}
func (s *Server) initIstiodCertLoader() error {
if err := s.loadIstiodCert(); err != nil {
return fmt.Errorf("first time load IstiodCert failed: %v", err)
}
_, watchCh := s.istiodCertBundleWatcher.AddWatcher()
s.addStartFunc(func(stop <-chan struct{}) error {
go s.reloadIstiodCert(watchCh, stop)
return nil
})
return nil
}
// initIstiodCerts creates Istiod certificates and also sets up watches to them.
func (s *Server) initIstiodCerts(args *PilotArgs, host string) error {
// Skip all certificates
var err error
// Append custom hostname if there is any
customHost := features.IstiodServiceCustomHost
s.dnsNames = []string{host}
cHosts := strings.Split(customHost, ",")
for _, cHost := range cHosts {
if cHost != "" && cHost != host {
log.Infof("Adding custom hostname %s", cHost)
s.dnsNames = append(s.dnsNames, cHost)
}
}
// The first is the recommended one, also used by Apiserver for webhooks.
// add a few known hostnames
knownHosts := []string{"istiod", "istiod-remote", "istio-pilot"}
// In some conditions, pilot address for sds is different from other xds,
// like multi-cluster primary-remote mode with revision.
if args.Revision != "" && args.Revision != "dubbo" {
knownHosts = append(knownHosts, "istiod"+"-"+args.Revision)
}
for _, altName := range knownHosts {
name := fmt.Sprintf("%v.%v.svc", altName, args.Namespace)
exist := false
for _, cHost := range cHosts {
if name == host || name == cHost {
exist = true
}
}
if !exist {
s.dnsNames = append(s.dnsNames, name)
}
}
if hasCustomTLSCerts(args.ServerOptions.TLSOptions) {
// Use the DNS certificate provided via args.
err = s.initCertificateWatches(args.ServerOptions.TLSOptions)
if err != nil {
// Not crashing istiod - This typically happens if certs are missing and in tests.
log.Errorf("error initializing certificate watches: %v", err)
return nil
}
err = s.initIstiodCertLoader()
} else if features.PilotCertProvider == constants.CertProviderNone {
return nil
} else if s.EnableCA() && features.PilotCertProvider == constants.CertProviderIstiod {
log.Infof("initializing Istiod DNS certificates host: %s, custom host: %s", host, features.IstiodServiceCustomHost)
err = s.initDNSCerts(host, args.Namespace)
if err == nil {
err = s.initIstiodCertLoader()
}
} else if features.PilotCertProvider == constants.CertProviderKubernetes {
log.Infof("initializing Istiod DNS certificates host: %s, custom host: %s", host, features.IstiodServiceCustomHost)
err = s.initDNSCerts(host, args.Namespace)
if err == nil {
err = s.initIstiodCertLoader()
}
} else if strings.HasPrefix(features.PilotCertProvider, constants.CertProviderKubernetesSignerPrefix) {
log.Infof("initializing Istiod DNS certificates host: %s, custom host: %s", host, features.IstiodServiceCustomHost)
err = s.initDNSCerts(host, args.Namespace)
if err == nil {
err = s.initIstiodCertLoader()
}
}
return err
}
// createPeerCertVerifier creates a SPIFFE certificate verifier with the current istiod configuration.
func (s *Server) createPeerCertVerifier(tlsOptions TLSOptions) (*spiffe.PeerCertVerifier, error) {
if tlsOptions.CaCertFile == "" && s.CA == nil && features.SpiffeBundleEndpoints == "" && !s.isDisableCa() {
// Running locally without configured certs - no TLS mode
return nil, nil
}
peerCertVerifier := spiffe.NewPeerCertVerifier()
var rootCertBytes []byte
var err error
if tlsOptions.CaCertFile != "" {
if rootCertBytes, err = os.ReadFile(tlsOptions.CaCertFile); err != nil {
return nil, err
}
} else {
if s.RA != nil {
if strings.HasPrefix(features.PilotCertProvider, constants.CertProviderKubernetesSignerPrefix) {
signerName := strings.TrimPrefix(features.PilotCertProvider, constants.CertProviderKubernetesSignerPrefix)
caBundle, _ := s.RA.GetRootCertFromMeshConfig(signerName)
rootCertBytes = append(rootCertBytes, caBundle...)
} else {
rootCertBytes = append(rootCertBytes, s.RA.GetCAKeyCertBundle().GetRootCertPem()...)
}
}
if s.CA != nil {
rootCertBytes = append(rootCertBytes, s.CA.GetCAKeyCertBundle().GetRootCertPem()...)
}
}
if len(rootCertBytes) != 0 {
err := peerCertVerifier.AddMappingFromPEM(spiffe.GetTrustDomain(), rootCertBytes)
if err != nil {
return nil, fmt.Errorf("add root CAs into peerCertVerifier failed: %v", err)
}
}
if features.SpiffeBundleEndpoints != "" {
certMap, err := spiffe.RetrieveSpiffeBundleRootCertsFromStringInput(
features.SpiffeBundleEndpoints, []*x509.Certificate{})
if err != nil {
return nil, err
}
peerCertVerifier.AddMappings(certMap)
}
return peerCertVerifier, nil
}
// hasCustomTLSCerts returns true if custom TLS certificates are configured via args.
func hasCustomTLSCerts(tlsOptions TLSOptions) bool {
return tlsOptions.CaCertFile != "" && tlsOptions.CertFile != "" && tlsOptions.KeyFile != ""
}
// getIstiodCertificate returns the istiod certificate.
func (s *Server) getIstiodCertificate(*tls.ClientHelloInfo) (*tls.Certificate, error) {
s.certMu.RLock()
defer s.certMu.RUnlock()
if s.istiodCert != nil {
return s.istiodCert, nil
}
return nil, fmt.Errorf("cert not initialized")
}
// initControllers initializes the controllers.
func (s *Server) initControllers(args *PilotArgs) error {
log.Info("initializing controllers")
s.initMulticluster(args)
// Certificate controller is created before MCP controller in case MCP server pod
// waits to mount a certificate to be provisioned by the certificate controller.
if err := s.initCertController(args); err != nil {
return fmt.Errorf("error initializing certificate controller: %v", err)
}
if err := s.initConfigController(args); err != nil {
return fmt.Errorf("error initializing config controller: %v", err)
}
if err := s.initServiceControllers(args); err != nil {
return fmt.Errorf("error initializing service controllers: %v", err)
}
return nil
}
func (s *Server) initMulticluster(args *PilotArgs) {
if s.kubeClient == nil {
return
}
s.multiclusterController = multicluster.NewController(s.kubeClient, args.Namespace, s.clusterID)
s.XDSServer.ListRemoteClusters = s.multiclusterController.ListRemoteClusters
s.addStartFunc(func(stop <-chan struct{}) error {
return s.multiclusterController.Run(stop)
})
}
// maybeCreateCA creates and initializes CA Key if needed.
func (s *Server) maybeCreateCA(caOpts *caOptions) error {
// CA signing certificate must be created only if CA is enabled.
if s.EnableCA() {
log.Info("creating CA and initializing public key")
var err error
var corev1 v1.CoreV1Interface
if s.kubeClient != nil {
corev1 = s.kubeClient.CoreV1()
}
if useRemoteCerts.Get() {
if err = s.loadRemoteCACerts(caOpts, LocalCertDir.Get()); err != nil {
return fmt.Errorf("failed to load remote CA certs: %v", err)
}
}
// May return nil, if the CA is missing required configs - This is not an error.
if caOpts.ExternalCAType != "" {
if s.RA, err = s.createIstioRA(s.kubeClient, caOpts); err != nil {
return fmt.Errorf("failed to create RA: %v", err)
}
}
if !s.isDisableCa() {
if s.CA, err = s.createIstioCA(corev1, caOpts); err != nil {
return fmt.Errorf("failed to create CA: %v", err)
}
}
}
return nil
}
func (s *Server) shouldStartNsController() bool {
if s.isDisableCa() {
return true
}
if s.CA == nil {
return false
}
// For Kubernetes CA, we don't distribute it; it is mounted in all pods by Kubernetes.
if features.PilotCertProvider == constants.CertProviderKubernetes {
return false
}
// For no CA we don't distribute it either, as there is no cert
if features.PilotCertProvider == constants.CertProviderNone {
return false
}
return true
}
// StartCA starts the CA or RA server if configured.
func (s *Server) startCA(caOpts *caOptions) {
if s.CA == nil && s.RA == nil {
return
}
s.addStartFunc(func(stop <-chan struct{}) error {
grpcServer := s.secureGrpcServer
if s.secureGrpcServer == nil {
grpcServer = s.grpcServer
}
// Start the RA server if configured, else start the CA server
if s.RA != nil {
log.Infof("Starting RA")
s.RunCA(grpcServer, s.RA, caOpts)
} else if s.CA != nil {
log.Infof("Starting IstioD CA")
s.RunCA(grpcServer, s.CA, caOpts)
}
return nil
})
}
// initMeshHandlers initializes mesh and network handlers.
func (s *Server) initMeshHandlers() {
log.Info("initializing mesh handlers")
// When the mesh config or networks change, do a full push.
s.environment.AddMeshHandler(func() {
spiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain())
s.XDSServer.ConfigGenerator.MeshConfigChanged(s.environment.Mesh())
s.XDSServer.ConfigUpdate(&model.PushRequest{
Full: true,
Reason: []model.TriggerReason{model.GlobalUpdate},
})
})
}
func (s *Server) addIstioCAToTrustBundle(args *PilotArgs) error {
var err error
if s.CA != nil {
// If IstioCA is setup, derive trustAnchor directly from CA
rootCerts := []string{string(s.CA.GetCAKeyCertBundle().GetRootCertPem())}
err = s.workloadTrustBundle.UpdateTrustAnchor(&tb.TrustAnchorUpdate{
TrustAnchorConfig: tb.TrustAnchorConfig{Certs: rootCerts},
Source: tb.SourceIstioCA,
})
if err != nil {
log.Errorf("unable to add CA root from namespace %s as trustAnchor", args.Namespace)
return err
}
return nil
}
return nil
}
func (s *Server) initWorkloadTrustBundle(args *PilotArgs) error {
var err error
if !features.MultiRootMesh {
return nil
}
s.workloadTrustBundle.UpdateCb(func() {
pushReq := &model.PushRequest{
Full: true,
Reason: []model.TriggerReason{model.GlobalUpdate},
}
s.XDSServer.ConfigUpdate(pushReq)
})
s.addStartFunc(func(stop <-chan struct{}) error {
go s.workloadTrustBundle.ProcessRemoteTrustAnchors(stop, tb.RemoteDefaultPollPeriod)
return nil
})
// MeshConfig: Add initial roots
err = s.workloadTrustBundle.AddMeshConfigUpdate(s.environment.Mesh())
if err != nil {
return err
}
// MeshConfig:Add callback for mesh config update
s.environment.AddMeshHandler(func() {
_ = s.workloadTrustBundle.AddMeshConfigUpdate(s.environment.Mesh())
})
err = s.addIstioCAToTrustBundle(args)
if err != nil {
return err
}
// IstioRA: Explicitly add roots corresponding to RA
if s.RA != nil {
// Implicitly add the Istio RA certificates to the Workload Trust Bundle
rootCerts := []string{string(s.RA.GetCAKeyCertBundle().GetRootCertPem())}
err = s.workloadTrustBundle.UpdateTrustAnchor(&tb.TrustAnchorUpdate{
TrustAnchorConfig: tb.TrustAnchorConfig{Certs: rootCerts},
Source: tb.SourceIstioRA,
})
if err != nil {
log.Errorf("fatal: unable to add RA root as trustAnchor")
return err
}
}
log.Infof("done initializing workload trustBundle")
return nil
}
// isDisableCa returns whether CA functionality is disabled in istiod.
// It return true only if istiod certs is signed by Kubernetes and
// workload certs are signed by external CA
func (s *Server) isDisableCa() bool {
if s.RA != nil {
// do not create CA server if PilotCertProvider is `kubernetes` and RA server exists
if features.PilotCertProvider == constants.CertProviderKubernetes {
return true
}
// do not create CA server if PilotCertProvider is `k8s.io/*` and RA server exists
if strings.HasPrefix(features.PilotCertProvider, constants.CertProviderKubernetesSignerPrefix) {
return true
}
}
return false
}
func (s *Server) initStatusManager(_ *PilotArgs) {
s.addStartFunc(func(stop <-chan struct{}) error {
s.statusManager = status.NewManager(s.RWConfigStore)
s.statusManager.Start(stop)
return nil
})
}