blob: 8352e9f279d0717526fa5a7aa914662f17625aab [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package multicluster
import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"sync"
"time"
)
import (
"github.com/hashicorp/go-multierror"
"go.uber.org/atomic"
"istio.io/pkg/log"
"istio.io/pkg/monitoring"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
"github.com/apache/dubbo-go-pixiu/pkg/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/kube"
"github.com/apache/dubbo-go-pixiu/pkg/kube/controllers"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
)
const (
MultiClusterSecretLabel = "istio/multiCluster"
)
func init() {
monitoring.MustRegister(timeouts)
monitoring.MustRegister(clustersCount)
}
var (
timeouts = monitoring.NewSum(
"remote_cluster_sync_timeouts_total",
"Number of times remote clusters took too long to sync, causing slow startup that excludes remote clusters.",
)
clusterType = monitoring.MustCreateLabel("cluster_type")
clustersCount = monitoring.NewGauge(
"istiod_managed_clusters",
"Number of clusters managed by istiod",
monitoring.WithLabels(clusterType),
)
localClusters = clustersCount.With(clusterType.Value("local"))
remoteClusters = clustersCount.With(clusterType.Value("remote"))
)
type ClusterHandler interface {
ClusterAdded(cluster *Cluster, stop <-chan struct{}) error
ClusterUpdated(cluster *Cluster, stop <-chan struct{}) error
ClusterDeleted(clusterID cluster.ID) error
}
// Controller is the controller implementation for Secret resources
type Controller struct {
namespace string
localClusterID cluster.ID
localClusterClient kube.Client
queue controllers.Queue
informer cache.SharedIndexInformer
cs *ClusterStore
handlers []ClusterHandler
once sync.Once
syncInterval time.Duration
remoteSyncTimeout atomic.Bool
}
// Cluster defines cluster struct
type Cluster struct {
// ID of the cluster.
ID cluster.ID
// SyncTimeout is marked after features.RemoteClusterTimeout.
SyncTimeout *atomic.Bool
// Client for accessing the cluster.
Client kube.Client
kubeConfigSha [sha256.Size]byte
stop chan struct{}
// initialSync is marked when RunAndWait completes
initialSync *atomic.Bool
}
// Stop channel which is closed when the cluster is removed or the Controller that created the client is stopped.
// Client.RunAndWait is called using this channel.
func (r *Cluster) Stop() <-chan struct{} {
return r.stop
}
func (c *Controller) AddHandler(h ClusterHandler) {
log.Infof("handling remote clusters in %T", h)
c.handlers = append(c.handlers, h)
}
// Run starts the cluster's informers and waits for caches to sync. Once caches are synced, we mark the cluster synced.
// This should be called after each of the handlers have registered informers, and should be run in a goroutine.
func (r *Cluster) Run() {
r.Client.RunAndWait(r.Stop())
r.initialSync.Store(true)
}
func (r *Cluster) HasSynced() bool {
return r.initialSync.Load() || r.SyncTimeout.Load()
}
func (r *Cluster) SyncDidTimeout() bool {
return r.SyncTimeout.Load() && !r.HasSynced()
}
// ClusterStore is a collection of clusters
type ClusterStore struct {
sync.RWMutex
// keyed by secret key(ns/name)->clusterID
remoteClusters map[string]map[cluster.ID]*Cluster
clusters sets.Set
}
// newClustersStore initializes data struct to store clusters information
func newClustersStore() *ClusterStore {
return &ClusterStore{
remoteClusters: make(map[string]map[cluster.ID]*Cluster),
clusters: sets.New(),
}
}
func (c *ClusterStore) Store(secretKey string, clusterID cluster.ID, value *Cluster) {
c.Lock()
defer c.Unlock()
if _, ok := c.remoteClusters[secretKey]; !ok {
c.remoteClusters[secretKey] = make(map[cluster.ID]*Cluster)
}
c.remoteClusters[secretKey][clusterID] = value
c.clusters.Insert(string(clusterID))
}
func (c *ClusterStore) Delete(secretKey string, clusterID cluster.ID) {
c.Lock()
defer c.Unlock()
delete(c.remoteClusters[secretKey], clusterID)
c.clusters.Delete(string(clusterID))
if len(c.remoteClusters[secretKey]) == 0 {
delete(c.remoteClusters, secretKey)
}
}
func (c *ClusterStore) Get(secretKey string, clusterID cluster.ID) *Cluster {
c.RLock()
defer c.RUnlock()
if _, ok := c.remoteClusters[secretKey]; !ok {
return nil
}
return c.remoteClusters[secretKey][clusterID]
}
func (c *ClusterStore) Contains(clusterID cluster.ID) bool {
c.RLock()
defer c.RUnlock()
return c.clusters.Contains(string(clusterID))
}
func (c *ClusterStore) GetByID(clusterID cluster.ID) *Cluster {
c.RLock()
defer c.RUnlock()
for _, clusters := range c.remoteClusters {
c, ok := clusters[clusterID]
if ok {
return c
}
}
return nil
}
// All returns a copy of the current remote clusters.
func (c *ClusterStore) All() map[string]map[cluster.ID]*Cluster {
if c == nil {
return nil
}
c.RLock()
defer c.RUnlock()
out := make(map[string]map[cluster.ID]*Cluster, len(c.remoteClusters))
for secret, clusters := range c.remoteClusters {
out[secret] = make(map[cluster.ID]*Cluster, len(clusters))
for cid, c := range clusters {
outCluster := *c
out[secret][cid] = &outCluster
}
}
return out
}
// GetExistingClustersFor return existing clusters registered for the given secret
func (c *ClusterStore) GetExistingClustersFor(secretKey string) []*Cluster {
c.RLock()
defer c.RUnlock()
out := make([]*Cluster, 0, len(c.remoteClusters[secretKey]))
for _, cluster := range c.remoteClusters[secretKey] {
out = append(out, cluster)
}
return out
}
func (c *ClusterStore) Len() int {
c.Lock()
defer c.Unlock()
out := 0
for _, clusterMap := range c.remoteClusters {
out += len(clusterMap)
}
return out
}
// NewController returns a new secret controller
func NewController(kubeclientset kube.Client, namespace string, localClusterID cluster.ID) *Controller {
secretsInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
opts.LabelSelector = MultiClusterSecretLabel + "=true"
return kubeclientset.CoreV1().Secrets(namespace).List(context.TODO(), opts)
},
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
opts.LabelSelector = MultiClusterSecretLabel + "=true"
return kubeclientset.CoreV1().Secrets(namespace).Watch(context.TODO(), opts)
},
},
&corev1.Secret{}, 0, cache.Indexers{},
)
// init gauges
localClusters.Record(1.0)
remoteClusters.Record(0.0)
controller := &Controller{
namespace: namespace,
localClusterID: localClusterID,
localClusterClient: kubeclientset,
cs: newClustersStore(),
informer: secretsInformer,
syncInterval: 100 * time.Millisecond,
}
controller.queue = controllers.NewQueue("multicluster secret", controllers.WithReconciler(controller.processItem))
secretsInformer.AddEventHandler(controllers.ObjectHandler(controller.queue.AddObject))
return controller
}
// Run starts the controller until it receives a message over stopCh
func (c *Controller) Run(stopCh <-chan struct{}) error {
// run handlers for the local cluster; do not store this *Cluster in the ClusterStore or give it a SyncTimeout
// this is done outside the goroutine, we should block other Run/startFuncs until this is registered
localCluster := &Cluster{Client: c.localClusterClient, ID: c.localClusterID}
if err := c.handleAdd(localCluster, stopCh); err != nil {
return fmt.Errorf("failed initializing local cluster %s: %v", c.localClusterID, err)
}
go func() {
t0 := time.Now()
log.Info("Starting multicluster remote secrets controller")
go c.informer.Run(stopCh)
if !kube.WaitForCacheSyncInterval(stopCh, c.syncInterval, c.informer.HasSynced) {
log.Error("Failed to sync multicluster remote secrets controller cache")
return
}
log.Infof("multicluster remote secrets controller cache synced in %v", time.Since(t0))
if features.RemoteClusterTimeout != 0 {
time.AfterFunc(features.RemoteClusterTimeout, func() {
c.remoteSyncTimeout.Store(true)
})
}
c.queue.Run(stopCh)
}()
return nil
}
func (c *Controller) hasSynced() bool {
if !c.queue.HasSynced() {
log.Debug("secret controller did not sync secrets presented at startup")
// we haven't finished processing the secrets that were present at startup
return false
}
c.cs.RLock()
defer c.cs.RUnlock()
for _, clusterMap := range c.cs.remoteClusters {
for _, cluster := range clusterMap {
if !cluster.HasSynced() {
log.Debugf("remote cluster %s registered informers have not been synced up yet", cluster.ID)
return false
}
}
}
return true
}
func (c *Controller) HasSynced() bool {
synced := c.hasSynced()
if synced {
return true
}
if c.remoteSyncTimeout.Load() {
c.once.Do(func() {
log.Errorf("remote clusters failed to sync after %v", features.RemoteClusterTimeout)
timeouts.Increment()
})
return true
}
return synced
}
func (c *Controller) processItem(key types.NamespacedName) error {
log.Infof("processing secret event for secret %s", key)
obj, exists, err := c.informer.GetIndexer().GetByKey(key.String())
if err != nil {
return fmt.Errorf("error fetching object %s error: %v", key, err)
}
if exists {
log.Debugf("secret %s exists in informer cache, processing it", key)
c.addSecret(key, obj.(*corev1.Secret))
} else {
log.Debugf("secret %s does not exist in informer cache, deleting it", key)
c.deleteSecret(key.String())
}
remoteClusters.Record(float64(c.cs.Len()))
return nil
}
// BuildClientsFromConfig creates kube.Clients from the provided kubeconfig. This is overridden for testing only
var BuildClientsFromConfig = func(kubeConfig []byte) (kube.Client, error) {
if len(kubeConfig) == 0 {
return nil, errors.New("kubeconfig is empty")
}
rawConfig, err := clientcmd.Load(kubeConfig)
if err != nil {
return nil, fmt.Errorf("kubeconfig cannot be loaded: %v", err)
}
if err := clientcmd.Validate(*rawConfig); err != nil {
return nil, fmt.Errorf("kubeconfig is not valid: %v", err)
}
if err := sanitizeKubeConfig(*rawConfig, features.InsecureKubeConfigOptions); err != nil {
return nil, fmt.Errorf("kubeconfig is not allowed: %v", err)
}
clientConfig := clientcmd.NewDefaultClientConfig(*rawConfig, &clientcmd.ConfigOverrides{})
clients, err := kube.NewClient(clientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create kube clients: %v", err)
}
return clients, nil
}
// sanitizeKubeConfig sanitizes a kubeconfig file to strip out insecure settings which may leak
// confidential materials.
// See https://github.com/kubernetes/kubectl/issues/697
func sanitizeKubeConfig(config api.Config, allowlist sets.Set) error {
for k, auths := range config.AuthInfos {
if ap := auths.AuthProvider; ap != nil {
// We currently are importing 5 authenticators: gcp, azure, exec, and openstack
switch ap.Name {
case "oidc":
// OIDC is safe as it doesn't read files or execute code.
// create-remote-secret specifically supports OIDC so its probably important to not break this.
default:
if !allowlist.Contains(ap.Name) {
// All the others - gcp, azure, exec, and openstack - are unsafe
return fmt.Errorf("auth provider %s is not allowed", ap.Name)
}
}
}
if auths.ClientKey != "" && !allowlist.Contains("clientKey") {
return fmt.Errorf("clientKey is not allowed")
}
if auths.ClientCertificate != "" && !allowlist.Contains("clientCertificate") {
return fmt.Errorf("clientCertificate is not allowed")
}
if auths.TokenFile != "" && !allowlist.Contains("tokenFile") {
return fmt.Errorf("tokenFile is not allowed")
}
if auths.Exec != nil && !allowlist.Contains("exec") {
return fmt.Errorf("exec is not allowed")
}
// Reconstruct the AuthInfo so if a new field is added we will not include it without review
config.AuthInfos[k] = &api.AuthInfo{
// LocationOfOrigin: Not needed
ClientCertificate: auths.ClientCertificate,
ClientCertificateData: auths.ClientCertificateData,
ClientKey: auths.ClientKey,
ClientKeyData: auths.ClientKeyData,
Token: auths.Token,
TokenFile: auths.TokenFile,
Impersonate: auths.Impersonate,
ImpersonateGroups: auths.ImpersonateGroups,
ImpersonateUserExtra: auths.ImpersonateUserExtra,
Username: auths.Username,
Password: auths.Password,
AuthProvider: auths.AuthProvider, // Included because it is sanitized above
Exec: auths.Exec,
// Extensions: Not needed,
}
// Other relevant fields that are not acted on:
// * Cluster.Server (and ProxyURL). This allows the user to send requests to arbitrary URLs, enabling potential SSRF attacks.
// However, we don't actually know what valid URLs are, so we cannot reasonably constrain this. Instead,
// we try to limit what confidential information could be exfiltrated (from AuthInfo). Additionally, the user cannot control
// the paths we send requests to, limiting potential attack scope.
// * Cluster.CertificateAuthority. While this reads from files, the result is not attached to the request and is instead
// entirely local
}
return nil
}
func (c *Controller) createRemoteCluster(kubeConfig []byte, clusterID string) (*Cluster, error) {
clients, err := BuildClientsFromConfig(kubeConfig)
if err != nil {
return nil, err
}
return &Cluster{
ID: cluster.ID(clusterID),
Client: clients,
stop: make(chan struct{}),
// for use inside the package, to close on cleanup
initialSync: atomic.NewBool(false),
SyncTimeout: &c.remoteSyncTimeout,
kubeConfigSha: sha256.Sum256(kubeConfig),
}, nil
}
func (c *Controller) addSecret(name types.NamespacedName, s *corev1.Secret) {
secretKey := name.String()
// First delete clusters
existingClusters := c.cs.GetExistingClustersFor(secretKey)
for _, existingCluster := range existingClusters {
if _, ok := s.Data[string(existingCluster.ID)]; !ok {
c.deleteCluster(secretKey, existingCluster.ID)
}
}
for clusterID, kubeConfig := range s.Data {
if cluster.ID(clusterID) == c.localClusterID {
log.Infof("ignoring cluster %v from secret %v as it would overwrite the local cluster", clusterID, secretKey)
continue
}
action, callback := "Adding", c.handleAdd
if prev := c.cs.Get(secretKey, cluster.ID(clusterID)); prev != nil {
action, callback = "Updating", c.handleUpdate
// clusterID must be unique even across multiple secrets
kubeConfigSha := sha256.Sum256(kubeConfig)
if bytes.Equal(kubeConfigSha[:], prev.kubeConfigSha[:]) {
log.Infof("skipping update of cluster_id=%v from secret=%v: (kubeconfig are identical)", clusterID, secretKey)
continue
}
// stop previous remote cluster
prev.Stop()
} else if c.cs.Contains(cluster.ID(clusterID)) {
// if the cluster has been registered before by another secret, ignore the new one.
log.Warnf("cluster %d from secret %s has already been registered", clusterID, secretKey)
continue
}
log.Infof("%s cluster %v from secret %v", action, clusterID, secretKey)
remoteCluster, err := c.createRemoteCluster(kubeConfig, clusterID)
if err != nil {
log.Errorf("%s cluster_id=%v from secret=%v: %v", action, clusterID, secretKey, err)
continue
}
c.cs.Store(secretKey, remoteCluster.ID, remoteCluster)
if err := callback(remoteCluster, remoteCluster.stop); err != nil {
log.Errorf("%s cluster_id from secret=%v: %s %v", action, clusterID, secretKey, err)
continue
}
log.Infof("finished callback for %s and starting to sync", clusterID)
go remoteCluster.Run()
}
log.Infof("Number of remote clusters: %d", c.cs.Len())
}
func (c *Controller) deleteSecret(secretKey string) {
for _, cluster := range c.cs.GetExistingClustersFor(secretKey) {
if cluster.ID == c.localClusterID {
log.Infof("ignoring delete cluster %v from secret %v as it would overwrite the local cluster", c.localClusterID, secretKey)
continue
}
log.Infof("Deleting cluster_id=%v configured by secret=%v", cluster.ID, secretKey)
close(cluster.stop)
err := c.handleDelete(cluster.ID)
if err != nil {
log.Errorf("Error removing cluster_id=%v configured by secret=%v: %v",
cluster.ID, secretKey, err)
}
c.cs.Delete(secretKey, cluster.ID)
}
log.Infof("Number of remote clusters: %d", c.cs.Len())
}
func (c *Controller) deleteCluster(secretKey string, clusterID cluster.ID) {
c.cs.Lock()
defer func() {
c.cs.Unlock()
log.Infof("Number of remote clusters: %d", c.cs.Len())
}()
log.Infof("Deleting cluster_id=%v configured by secret=%v", clusterID, secretKey)
close(c.cs.remoteClusters[secretKey][clusterID].stop)
err := c.handleDelete(clusterID)
if err != nil {
log.Errorf("Error removing cluster_id=%v configured by secret=%v: %v",
clusterID, secretKey, err)
}
delete(c.cs.remoteClusters[secretKey], clusterID)
}
func (c *Controller) handleAdd(cluster *Cluster, stop <-chan struct{}) error {
var errs *multierror.Error
for _, handler := range c.handlers {
errs = multierror.Append(errs, handler.ClusterAdded(cluster, stop))
}
return errs.ErrorOrNil()
}
func (c *Controller) handleUpdate(cluster *Cluster, stop <-chan struct{}) error {
var errs *multierror.Error
for _, handler := range c.handlers {
errs = multierror.Append(errs, handler.ClusterUpdated(cluster, stop))
}
return errs.ErrorOrNil()
}
func (c *Controller) handleDelete(key cluster.ID) error {
var errs *multierror.Error
for _, handler := range c.handlers {
errs = multierror.Append(errs, handler.ClusterDeleted(key))
}
return errs.ErrorOrNil()
}
// ListRemoteClusters provides debug info about connected remote clusters.
func (c *Controller) ListRemoteClusters() []cluster.DebugInfo {
var out []cluster.DebugInfo
for secretName, clusters := range c.cs.All() {
for clusterID, c := range clusters {
syncStatus := "syncing"
if c.HasSynced() {
syncStatus = "synced"
} else if c.SyncDidTimeout() {
syncStatus = "timeout"
}
out = append(out, cluster.DebugInfo{
ID: clusterID,
SecretName: secretName,
SyncStatus: syncStatus,
})
}
}
return out
}
func (c *Controller) GetRemoteKubeClient(clusterID cluster.ID) kubernetes.Interface {
if remoteCluster := c.cs.GetByID(clusterID); remoteCluster != nil {
return remoteCluster.Client
}
return nil
}