package multicluster
import (
corev1 ""
metav1 ""
const (
MultiClusterSecretLabel = "istio/multiCluster"
func init() {
var (
timeouts = monitoring.NewSum(
"Number of times remote clusters took too long to sync, causing slow startup that excludes remote clusters.",
clusterType = monitoring.MustCreateLabel("cluster_type")
clustersCount = monitoring.NewGauge(
"Number of clusters managed by istiod",
localClusters = clustersCount.With(clusterType.Value("local"))
remoteClusters = clustersCount.With(clusterType.Value("remote"))
type ClusterHandler interface {
ClusterAdded(cluster *Cluster, stop <-chan struct{}) error
ClusterUpdated(cluster *Cluster, stop <-chan struct{}) error
ClusterDeleted(clusterID cluster.ID) error
// Controller is the controller implementation for Secret resources
type Controller struct {
namespace string
localClusterID cluster.ID
localClusterClient kube.Client
queue controllers.Queue
informer cache.SharedIndexInformer
cs *ClusterStore
handlers []ClusterHandler
once sync.Once
syncInterval time.Duration
remoteSyncTimeout atomic.Bool
// Cluster defines cluster struct
type Cluster struct {
// ID of the cluster.
ID cluster.ID
// SyncTimeout is marked after features.RemoteClusterTimeout.
SyncTimeout *atomic.Bool
// Client for accessing the cluster.
Client kube.Client
kubeConfigSha [sha256.Size]byte
stop chan struct{}
// initialSync is marked when RunAndWait completes
initialSync *atomic.Bool
// Stop channel which is closed when the cluster is removed or the Controller that created the client is stopped.
// Client.RunAndWait is called using this channel.
func (r *Cluster) Stop() <-chan struct{} {
return r.stop
func (c *Controller) AddHandler(h ClusterHandler) {
log.Infof("handling remote clusters in %T", h)
c.handlers = append(c.handlers, h)
// Run starts the cluster's informers and waits for caches to sync. Once caches are synced, we mark the cluster synced.
// This should be called after each of the handlers have registered informers, and should be run in a goroutine.
func (r *Cluster) Run() {
func (r *Cluster) HasSynced() bool {
return r.initialSync.Load() || r.SyncTimeout.Load()
func (r *Cluster) SyncDidTimeout() bool {
return r.SyncTimeout.Load() && !r.HasSynced()
// ClusterStore is a collection of clusters
type ClusterStore struct {
// keyed by secret key(ns/name)->clusterID
remoteClusters map[string]map[cluster.ID]*Cluster
clusters sets.Set
// newClustersStore initializes data struct to store clusters information
func newClustersStore() *ClusterStore {
return &ClusterStore{
remoteClusters: make(map[string]map[cluster.ID]*Cluster),
clusters: sets.New(),
func (c *ClusterStore) Store(secretKey string, clusterID cluster.ID, value *Cluster) {
defer c.Unlock()
if _, ok := c.remoteClusters[secretKey]; !ok {
c.remoteClusters[secretKey] = make(map[cluster.ID]*Cluster)
c.remoteClusters[secretKey][clusterID] = value
func (c *ClusterStore) Delete(secretKey string, clusterID cluster.ID) {
defer c.Unlock()
delete(c.remoteClusters[secretKey], clusterID)
if len(c.remoteClusters[secretKey]) == 0 {
delete(c.remoteClusters, secretKey)
func (c *ClusterStore) Get(secretKey string, clusterID cluster.ID) *Cluster {
defer c.RUnlock()
if _, ok := c.remoteClusters[secretKey]; !ok {
return nil
return c.remoteClusters[secretKey][clusterID]
func (c *ClusterStore) Contains(clusterID cluster.ID) bool {
defer c.RUnlock()
return c.clusters.Contains(string(clusterID))
func (c *ClusterStore) GetByID(clusterID cluster.ID) *Cluster {
defer c.RUnlock()
for _, clusters := range c.remoteClusters {
c, ok := clusters[clusterID]
if ok {
return c
return nil
// All returns a copy of the current remote clusters.
func (c *ClusterStore) All() map[string]map[cluster.ID]*Cluster {
if c == nil {
return nil
defer c.RUnlock()
out := make(map[string]map[cluster.ID]*Cluster, len(c.remoteClusters))
for secret, clusters := range c.remoteClusters {
out[secret] = make(map[cluster.ID]*Cluster, len(clusters))
for cid, c := range clusters {
outCluster := *c
out[secret][cid] = &outCluster
return out
// GetExistingClustersFor return existing clusters registered for the given secret
func (c *ClusterStore) GetExistingClustersFor(secretKey string) []*Cluster {
defer c.RUnlock()
out := make([]*Cluster, 0, len(c.remoteClusters[secretKey]))
for _, cluster := range c.remoteClusters[secretKey] {
out = append(out, cluster)
return out
func (c *ClusterStore) Len() int {
defer c.Unlock()
out := 0
for _, clusterMap := range c.remoteClusters {
out += len(clusterMap)
return out
// NewController returns a new secret controller
func NewController(kubeclientset kube.Client, namespace string, localClusterID cluster.ID) *Controller {
secretsInformer := cache.NewSharedIndexInformer(
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
opts.LabelSelector = MultiClusterSecretLabel + "=true"
return kubeclientset.CoreV1().Secrets(namespace).List(context.TODO(), opts)
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
opts.LabelSelector = MultiClusterSecretLabel + "=true"
return kubeclientset.CoreV1().Secrets(namespace).Watch(context.TODO(), opts)
&corev1.Secret{}, 0, cache.Indexers{},
// init gauges
controller := &Controller{
namespace: namespace,
localClusterID: localClusterID,
localClusterClient: kubeclientset,
cs: newClustersStore(),
informer: secretsInformer,
syncInterval: 100 * time.Millisecond,
controller.queue = controllers.NewQueue("multicluster secret", controllers.WithReconciler(controller.processItem))
return controller
// Run starts the controller until it receives a message over stopCh
func (c *Controller) Run(stopCh <-chan struct{}) error {
// run handlers for the local cluster; do not store this *Cluster in the ClusterStore or give it a SyncTimeout
// this is done outside the goroutine, we should block other Run/startFuncs until this is registered
localCluster := &Cluster{Client: c.localClusterClient, ID: c.localClusterID}
if err := c.handleAdd(localCluster, stopCh); err != nil {
return fmt.Errorf("failed initializing local cluster %s: %v", c.localClusterID, err)
go func() {
t0 := time.Now()
log.Info("Starting multicluster remote secrets controller")
go c.informer.Run(stopCh)
if !kube.WaitForCacheSyncInterval(stopCh, c.syncInterval, c.informer.HasSynced) {
log.Error("Failed to sync multicluster remote secrets controller cache")
log.Infof("multicluster remote secrets controller cache synced in %v", time.Since(t0))
if features.RemoteClusterTimeout != 0 {
time.AfterFunc(features.RemoteClusterTimeout, func() {
return nil
func (c *Controller) hasSynced() bool {
if !c.queue.HasSynced() {
log.Debug("secret controller did not sync secrets presented at startup")
// we haven't finished processing the secrets that were present at startup
return false
defer c.cs.RUnlock()
for _, clusterMap := range c.cs.remoteClusters {
for _, cluster := range clusterMap {
if !cluster.HasSynced() {
log.Debugf("remote cluster %s registered informers have not been synced up yet", cluster.ID)
return false
return true
func (c *Controller) HasSynced() bool {
synced := c.hasSynced()
if synced {
return true
if c.remoteSyncTimeout.Load() {
c.once.Do(func() {
log.Errorf("remote clusters failed to sync after %v", features.RemoteClusterTimeout)
return true
return synced
func (c *Controller) processItem(key types.NamespacedName) error {
log.Infof("processing secret event for secret %s", key)
obj, exists, err := c.informer.GetIndexer().GetByKey(key.String())
if err != nil {
return fmt.Errorf("error fetching object %s error: %v", key, err)
if exists {
log.Debugf("secret %s exists in informer cache, processing it", key)
c.addSecret(key, obj.(*corev1.Secret))
} else {
log.Debugf("secret %s does not exist in informer cache, deleting it", key)
return nil
// BuildClientsFromConfig creates kube.Clients from the provided kubeconfig. This is overridden for testing only
var BuildClientsFromConfig = func(kubeConfig []byte) (kube.Client, error) {
if len(kubeConfig) == 0 {
return nil, errors.New("kubeconfig is empty")
rawConfig, err := clientcmd.Load(kubeConfig)
if err != nil {
return nil, fmt.Errorf("kubeconfig cannot be loaded: %v", err)
if err := clientcmd.Validate(*rawConfig); err != nil {
return nil, fmt.Errorf("kubeconfig is not valid: %v", err)
if err := sanitizeKubeConfig(*rawConfig, features.InsecureKubeConfigOptions); err != nil {
return nil, fmt.Errorf("kubeconfig is not allowed: %v", err)
clientConfig := clientcmd.NewDefaultClientConfig(*rawConfig, &clientcmd.ConfigOverrides{})
clients, err := kube.NewClient(clientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create kube clients: %v", err)
return clients, nil
// sanitizeKubeConfig sanitizes a kubeconfig file to strip out insecure settings which may leak
// confidential materials.
// See
func sanitizeKubeConfig(config api.Config, allowlist sets.Set) error {
for k, auths := range config.AuthInfos {
if ap := auths.AuthProvider; ap != nil {
// We currently are importing 5 authenticators: gcp, azure, exec, and openstack
switch ap.Name {
case "oidc":
// OIDC is safe as it doesn't read files or execute code.
// create-remote-secret specifically supports OIDC so its probably important to not break this.
if !allowlist.Contains(ap.Name) {
// All the others - gcp, azure, exec, and openstack - are unsafe
return fmt.Errorf("auth provider %s is not allowed", ap.Name)
if auths.ClientKey != "" && !allowlist.Contains("clientKey") {
return fmt.Errorf("clientKey is not allowed")
if auths.ClientCertificate != "" && !allowlist.Contains("clientCertificate") {
return fmt.Errorf("clientCertificate is not allowed")
if auths.TokenFile != "" && !allowlist.Contains("tokenFile") {
return fmt.Errorf("tokenFile is not allowed")
if auths.Exec != nil && !allowlist.Contains("exec") {
return fmt.Errorf("exec is not allowed")
// Reconstruct the AuthInfo so if a new field is added we will not include it without review
config.AuthInfos[k] = &api.AuthInfo{
// LocationOfOrigin: Not needed
ClientCertificate: auths.ClientCertificate,
ClientCertificateData: auths.ClientCertificateData,
ClientKey: auths.ClientKey,
ClientKeyData: auths.ClientKeyData,
Token: auths.Token,
TokenFile: auths.TokenFile,
Impersonate: auths.Impersonate,
ImpersonateGroups: auths.ImpersonateGroups,
ImpersonateUserExtra: auths.ImpersonateUserExtra,
Username: auths.Username,
Password: auths.Password,
AuthProvider: auths.AuthProvider, // Included because it is sanitized above
Exec: auths.Exec,
// Extensions: Not needed,
// Other relevant fields that are not acted on:
// * Cluster.Server (and ProxyURL). This allows the user to send requests to arbitrary URLs, enabling potential SSRF attacks.
// However, we don't actually know what valid URLs are, so we cannot reasonably constrain this. Instead,
// we try to limit what confidential information could be exfiltrated (from AuthInfo). Additionally, the user cannot control
// the paths we send requests to, limiting potential attack scope.
// * Cluster.CertificateAuthority. While this reads from files, the result is not attached to the request and is instead
// entirely local
return nil
func (c *Controller) createRemoteCluster(kubeConfig []byte, clusterID string) (*Cluster, error) {
clients, err := BuildClientsFromConfig(kubeConfig)
if err != nil {
return nil, err
return &Cluster{
ID: cluster.ID(clusterID),
Client: clients,
stop: make(chan struct{}),
// for use inside the package, to close on cleanup
initialSync: atomic.NewBool(false),
SyncTimeout: &c.remoteSyncTimeout,
kubeConfigSha: sha256.Sum256(kubeConfig),
}, nil
func (c *Controller) addSecret(name types.NamespacedName, s *corev1.Secret) {
secretKey := name.String()
// First delete clusters
existingClusters := c.cs.GetExistingClustersFor(secretKey)
for _, existingCluster := range existingClusters {
if _, ok := s.Data[string(existingCluster.ID)]; !ok {
c.deleteCluster(secretKey, existingCluster.ID)
for clusterID, kubeConfig := range s.Data {
if cluster.ID(clusterID) == c.localClusterID {
log.Infof("ignoring cluster %v from secret %v as it would overwrite the local cluster", clusterID, secretKey)
action, callback := "Adding", c.handleAdd
if prev := c.cs.Get(secretKey, cluster.ID(clusterID)); prev != nil {
action, callback = "Updating", c.handleUpdate
// clusterID must be unique even across multiple secrets
kubeConfigSha := sha256.Sum256(kubeConfig)
if bytes.Equal(kubeConfigSha[:], prev.kubeConfigSha[:]) {
log.Infof("skipping update of cluster_id=%v from secret=%v: (kubeconfig are identical)", clusterID, secretKey)
// stop previous remote cluster
} else if c.cs.Contains(cluster.ID(clusterID)) {
// if the cluster has been registered before by another secret, ignore the new one.
log.Warnf("cluster %d from secret %s has already been registered", clusterID, secretKey)
log.Infof("%s cluster %v from secret %v", action, clusterID, secretKey)
remoteCluster, err := c.createRemoteCluster(kubeConfig, clusterID)
if err != nil {
log.Errorf("%s cluster_id=%v from secret=%v: %v", action, clusterID, secretKey, err)
c.cs.Store(secretKey, remoteCluster.ID, remoteCluster)
if err := callback(remoteCluster, remoteCluster.stop); err != nil {
log.Errorf("%s cluster_id from secret=%v: %s %v", action, clusterID, secretKey, err)
log.Infof("finished callback for %s and starting to sync", clusterID)
go remoteCluster.Run()
log.Infof("Number of remote clusters: %d", c.cs.Len())
func (c *Controller) deleteSecret(secretKey string) {
for _, cluster := range c.cs.GetExistingClustersFor(secretKey) {
if cluster.ID == c.localClusterID {
log.Infof("ignoring delete cluster %v from secret %v as it would overwrite the local cluster", c.localClusterID, secretKey)
log.Infof("Deleting cluster_id=%v configured by secret=%v", cluster.ID, secretKey)
err := c.handleDelete(cluster.ID)
if err != nil {
log.Errorf("Error removing cluster_id=%v configured by secret=%v: %v",
cluster.ID, secretKey, err)
c.cs.Delete(secretKey, cluster.ID)
log.Infof("Number of remote clusters: %d", c.cs.Len())
func (c *Controller) deleteCluster(secretKey string, clusterID cluster.ID) {
defer func() {
log.Infof("Number of remote clusters: %d", c.cs.Len())
log.Infof("Deleting cluster_id=%v configured by secret=%v", clusterID, secretKey)
err := c.handleDelete(clusterID)
if err != nil {
log.Errorf("Error removing cluster_id=%v configured by secret=%v: %v",
clusterID, secretKey, err)
delete(c.cs.remoteClusters[secretKey], clusterID)
func (c *Controller) handleAdd(cluster *Cluster, stop <-chan struct{}) error {
var errs *multierror.Error
for _, handler := range c.handlers {
errs = multierror.Append(errs, handler.ClusterAdded(cluster, stop))
return errs.ErrorOrNil()
func (c *Controller) handleUpdate(cluster *Cluster, stop <-chan struct{}) error {
var errs *multierror.Error
for _, handler := range c.handlers {
errs = multierror.Append(errs, handler.ClusterUpdated(cluster, stop))
return errs.ErrorOrNil()
func (c *Controller) handleDelete(key cluster.ID) error {
var errs *multierror.Error
for _, handler := range c.handlers {
errs = multierror.Append(errs, handler.ClusterDeleted(key))
return errs.ErrorOrNil()
// ListRemoteClusters provides debug info about connected remote clusters.
func (c *Controller) ListRemoteClusters() []cluster.DebugInfo {
var out []cluster.DebugInfo
for secretName, clusters := range c.cs.All() {
for clusterID, c := range clusters {
syncStatus := "syncing"
if c.HasSynced() {
syncStatus = "synced"
} else if c.SyncDidTimeout() {
syncStatus = "timeout"
out = append(out, cluster.DebugInfo{
ID: clusterID,
SecretName: secretName,
SyncStatus: syncStatus,
return out
func (c *Controller) GetRemoteKubeClient(clusterID cluster.ID) kubernetes.Interface {
if remoteCluster := c.cs.GetByID(clusterID); remoteCluster != nil {
return remoteCluster.Client
return nil