blob: 02ba059b652782c24e6468279c5a45242c7215c2 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kube
import (
"fmt"
"sync"
)
import (
"istio.io/pkg/log"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/credentials"
"github.com/apache/dubbo-go-pixiu/pkg/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/kube/multicluster"
)
type secretHandler func(name string, namespace string)
// Multicluster structure holds the remote kube Controllers and multicluster specific attributes.
type Multicluster struct {
remoteKubeControllers map[cluster.ID]*CredentialsController
m sync.Mutex // protects remoteKubeControllers
localCluster cluster.ID
secretHandlers []secretHandler
}
var _ credentials.MulticlusterController = &Multicluster{}
func NewMulticluster(localCluster cluster.ID) *Multicluster {
m := &Multicluster{
remoteKubeControllers: map[cluster.ID]*CredentialsController{},
localCluster: localCluster,
}
return m
}
func (m *Multicluster) ClusterAdded(cluster *multicluster.Cluster, _ <-chan struct{}) error {
log.Infof("initializing Kubernetes credential reader for cluster %v", cluster.ID)
sc := NewCredentialsController(cluster.Client, cluster.ID)
m.m.Lock()
m.remoteKubeControllers[cluster.ID] = sc
for _, onCredential := range m.secretHandlers {
sc.AddEventHandler(onCredential)
}
m.m.Unlock()
return nil
}
func (m *Multicluster) ClusterUpdated(cluster *multicluster.Cluster, stop <-chan struct{}) error {
if err := m.ClusterDeleted(cluster.ID); err != nil {
return err
}
if err := m.ClusterAdded(cluster, stop); err != nil {
return err
}
return nil
}
func (m *Multicluster) ClusterDeleted(key cluster.ID) error {
m.m.Lock()
delete(m.remoteKubeControllers, key)
m.m.Unlock()
return nil
}
func (m *Multicluster) ForCluster(clusterID cluster.ID) (credentials.Controller, error) {
if _, f := m.remoteKubeControllers[clusterID]; !f {
return nil, fmt.Errorf("cluster %v is not configured", clusterID)
}
agg := &AggregateController{}
agg.controllers = []*CredentialsController{}
agg.authController = m.remoteKubeControllers[clusterID]
if clusterID != m.localCluster {
// If the request cluster is not the local cluster, we will append it and use it for auth
// This means we will prioritize the proxy cluster, then the local cluster for credential lookup
// Authorization will always use the proxy cluster.
agg.controllers = append(agg.controllers, m.remoteKubeControllers[clusterID])
}
agg.controllers = append(agg.controllers, m.remoteKubeControllers[m.localCluster])
return agg, nil
}
func (m *Multicluster) AddSecretHandler(h secretHandler) {
m.secretHandlers = append(m.secretHandlers, h)
for _, c := range m.remoteKubeControllers {
c.AddEventHandler(h)
}
}
type AggregateController struct {
// controllers to use to look up certs. Generally this will consistent of the local (config) cluster
// and a single remote cluster where the proxy resides
controllers []*CredentialsController
authController *CredentialsController
}
var _ credentials.Controller = &AggregateController{}
func (a *AggregateController) GetKeyAndCert(name, namespace string) (key []byte, cert []byte, err error) {
// Search through all clusters, find first non-empty result
var firstError error
for _, c := range a.controllers {
k, c, err := c.GetKeyAndCert(name, namespace)
if err != nil {
if firstError == nil {
firstError = err
}
} else {
return k, c, nil
}
}
return nil, nil, firstError
}
func (a *AggregateController) GetCaCert(name, namespace string) (cert []byte, err error) {
// Search through all clusters, find first non-empty result
var firstError error
for _, c := range a.controllers {
k, err := c.GetCaCert(name, namespace)
if err != nil {
if firstError == nil {
firstError = err
}
} else {
return k, nil
}
}
return nil, firstError
}
func (a *AggregateController) Authorize(serviceAccount, namespace string) error {
return a.authController.Authorize(serviceAccount, namespace)
}
func (a *AggregateController) AddEventHandler(f func(name string, namespace string)) {
// no ops
}
func (a *AggregateController) GetDockerCredential(name, namespace string) ([]byte, error) {
// Search through all clusters, find first non-empty result
var firstError error
for _, c := range a.controllers {
k, err := c.GetDockerCredential(name, namespace)
if err != nil {
if firstError == nil {
firstError = err
}
} else {
return k, nil
}
}
return nil, firstError
}