blob: da7a8b5b3507a1e04bff80094dc31e165d6175c3 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kube
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"
)
import (
"istio.io/pkg/log"
authorizationv1 "k8s.io/api/authorization/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
informersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
authorizationv1client "k8s.io/client-go/kubernetes/typed/authorization/v1"
listersv1 "k8s.io/client-go/listers/core/v1"
k8stesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/credentials"
securitymodel "github.com/apache/dubbo-go-pixiu/pilot/pkg/security/model"
"github.com/apache/dubbo-go-pixiu/pkg/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/kube"
"github.com/apache/dubbo-go-pixiu/pkg/kube/controllers"
)
const (
// The ID/name for the certificate chain in kubernetes generic secret.
GenericScrtCert = "cert"
// The ID/name for the private key in kubernetes generic secret.
GenericScrtKey = "key"
// The ID/name for the CA certificate in kubernetes generic secret.
GenericScrtCaCert = "cacert"
// The ID/name for the certificate chain in kubernetes tls secret.
TLSSecretCert = "tls.crt"
// The ID/name for the k8sKey in kubernetes tls secret.
TLSSecretKey = "tls.key"
// The ID/name for the CA certificate in kubernetes tls secret
TLSSecretCaCert = "ca.crt"
)
type CredentialsController struct {
secretInformer cache.SharedIndexInformer
secretLister listersv1.SecretLister
sar authorizationv1client.SubjectAccessReviewInterface
clusterID cluster.ID
mu sync.RWMutex
authorizationCache map[authorizationKey]authorizationResponse
}
type authorizationKey string
type authorizationResponse struct {
expiration time.Time
authorized error
}
var _ credentials.Controller = &CredentialsController{}
func NewCredentialsController(client kube.Client, clusterID cluster.ID) *CredentialsController {
informer := client.KubeInformer().InformerFor(&v1.Secret{}, func(k kubernetes.Interface, resync time.Duration) cache.SharedIndexInformer {
return informersv1.NewFilteredSecretInformer(
k, metav1.NamespaceAll, resync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
func(options *metav1.ListOptions) {
// We only care about TLS certificates and docker config for Wasm image pulling.
// Unfortunately, it is not as simple as selecting type=kubernetes.io/tls and type=kubernetes.io/dockerconfigjson.
// Because of legacy reasons and supporting an extra ca.crt, we also support generic types.
// Its also likely users have started to use random types and expect them to continue working.
// This makes the assumption we will never care about Helm secrets or SA token secrets - two common
// large secrets in clusters.
// This is a best effort optimization only; the code would behave correctly if we watched all secrets.
options.FieldSelector = fields.AndSelectors(
fields.OneTermNotEqualSelector("type", "helm.sh/release.v1"),
fields.OneTermNotEqualSelector("type", string(v1.SecretTypeServiceAccountToken)),
).String()
},
)
})
return &CredentialsController{
secretInformer: informer,
secretLister: listersv1.NewSecretLister(informer.GetIndexer()),
sar: client.AuthorizationV1().SubjectAccessReviews(),
clusterID: clusterID,
authorizationCache: make(map[authorizationKey]authorizationResponse),
}
}
func toUser(serviceAccount, namespace string) string {
return fmt.Sprintf("system:serviceaccount:%s:%s", namespace, serviceAccount)
}
const cacheTTL = time.Minute
// clearExpiredCache iterates through the cache and removes all expired entries. Should be called with mutex held.
func (s *CredentialsController) clearExpiredCache() {
for k, v := range s.authorizationCache {
if v.expiration.Before(time.Now()) {
delete(s.authorizationCache, k)
}
}
}
// cachedAuthorization checks the authorization cache
// nolint
func (s *CredentialsController) cachedAuthorization(user string) (error, bool) {
key := authorizationKey(user)
s.mu.Lock()
defer s.mu.Unlock()
s.clearExpiredCache()
// No need to check expiration, we will evict expired entries above
got, f := s.authorizationCache[key]
if !f {
return nil, false
}
return got.authorized, true
}
// cachedAuthorization checks the authorization cache
func (s *CredentialsController) insertCache(user string, response error) {
s.mu.Lock()
defer s.mu.Unlock()
key := authorizationKey(user)
expDelta := cacheTTL
if response == nil {
// Cache success a bit longer, there is no need to quickly revoke access
expDelta *= 5
}
log.Debugf("cached authorization for user %s: %v", user, response)
s.authorizationCache[key] = authorizationResponse{
expiration: time.Now().Add(expDelta),
authorized: response,
}
}
// DisableAuthorizationForTest makes the authorization check always pass. Should be used only for tests.
func DisableAuthorizationForTest(fake *fake.Clientset) {
fake.Fake.PrependReactor("create", "subjectaccessreviews", func(action k8stesting.Action) (bool, runtime.Object, error) {
return true, &authorizationv1.SubjectAccessReview{
Status: authorizationv1.SubjectAccessReviewStatus{
Allowed: true,
},
}, nil
})
}
func (s *CredentialsController) Authorize(serviceAccount, namespace string) error {
user := toUser(serviceAccount, namespace)
if cached, f := s.cachedAuthorization(user); f {
return cached
}
err := func() error {
resp, err := s.sar.Create(context.Background(), &authorizationv1.SubjectAccessReview{
ObjectMeta: metav1.ObjectMeta{},
Spec: authorizationv1.SubjectAccessReviewSpec{
ResourceAttributes: &authorizationv1.ResourceAttributes{
Namespace: namespace,
Verb: "list",
Resource: "secrets",
},
User: user,
},
}, metav1.CreateOptions{})
if err != nil {
return err
}
if !resp.Status.Allowed {
return fmt.Errorf("%s/%s is not authorized to read secrets: %v", serviceAccount, namespace, resp.Status.Reason)
}
return nil
}()
s.insertCache(user, err)
return err
}
func (s *CredentialsController) GetKeyAndCert(name, namespace string) (key []byte, cert []byte, err error) {
k8sSecret, err := s.secretLister.Secrets(namespace).Get(name)
if err != nil {
return nil, nil, fmt.Errorf("secret %v/%v not found", namespace, name)
}
return extractKeyAndCert(k8sSecret)
}
func (s *CredentialsController) GetCaCert(name, namespace string) (cert []byte, err error) {
strippedName := strings.TrimSuffix(name, securitymodel.SdsCaSuffix)
k8sSecret, err := s.secretLister.Secrets(namespace).Get(name)
if err != nil {
// Could not fetch cert, look for secret without -cacert suffix
k8sSecret, caCertErr := s.secretLister.Secrets(namespace).Get(strippedName)
if caCertErr != nil {
return nil, fmt.Errorf("secret %v/%v not found", namespace, strippedName)
}
return extractRoot(k8sSecret)
}
return extractRoot(k8sSecret)
}
func (s *CredentialsController) GetDockerCredential(name, namespace string) ([]byte, error) {
k8sSecret, err := s.secretLister.Secrets(namespace).Get(name)
if err != nil || k8sSecret == nil {
return nil, fmt.Errorf("secret %v/%v not found", namespace, name)
}
if k8sSecret.Type != v1.SecretTypeDockerConfigJson {
return nil, fmt.Errorf("type of secret %v/%v is not %v", namespace, name, v1.SecretTypeDockerConfigJson)
}
if cred, found := k8sSecret.Data[v1.DockerConfigJsonKey]; found {
return cred, nil
}
return nil, fmt.Errorf("cannot find docker config at secret %v/%v", namespace, name)
}
func hasKeys(d map[string][]byte, keys ...string) bool {
for _, k := range keys {
_, f := d[k]
if !f {
return false
}
}
return true
}
func hasValue(d map[string][]byte, keys ...string) bool {
for _, k := range keys {
v := d[k]
if len(v) == 0 {
return false
}
}
return true
}
// extractKeyAndCert extracts server key, certificate
func extractKeyAndCert(scrt *v1.Secret) (key, cert []byte, err error) {
if hasValue(scrt.Data, GenericScrtCert, GenericScrtKey) {
return scrt.Data[GenericScrtKey], scrt.Data[GenericScrtCert], nil
}
if hasValue(scrt.Data, TLSSecretCert, TLSSecretKey) {
return scrt.Data[TLSSecretKey], scrt.Data[TLSSecretCert], nil
}
// No cert found. Try to generate a helpful error messsage
if hasKeys(scrt.Data, GenericScrtCert, GenericScrtKey) {
return nil, nil, fmt.Errorf("found keys %q and %q, but they were empty", GenericScrtCert, GenericScrtKey)
}
if hasKeys(scrt.Data, TLSSecretCert, TLSSecretKey) {
return nil, nil, fmt.Errorf("found keys %q and %q, but they were empty", TLSSecretCert, TLSSecretKey)
}
found := truncatedKeysMessage(scrt.Data)
return nil, nil, fmt.Errorf("found secret, but didn't have expected keys (%s and %s) or (%s and %s); found: %s",
GenericScrtCert, GenericScrtKey, TLSSecretCert, TLSSecretKey, found)
}
func truncatedKeysMessage(data map[string][]byte) string {
keys := []string{}
for k := range data {
keys = append(keys, k)
}
sort.Strings(keys)
if len(keys) < 3 {
return strings.Join(keys, ", ")
}
return fmt.Sprintf("%s, and %d more...", strings.Join(keys[:3], ", "), len(keys)-3)
}
// extractRoot extracts the root certificate
func extractRoot(scrt *v1.Secret) (cert []byte, err error) {
if hasValue(scrt.Data, GenericScrtCaCert) {
return scrt.Data[GenericScrtCaCert], nil
}
if hasValue(scrt.Data, TLSSecretCaCert) {
return scrt.Data[TLSSecretCaCert], nil
}
// No cert found. Try to generate a helpful error messsage
if hasKeys(scrt.Data, GenericScrtCaCert) {
return nil, fmt.Errorf("found key %q, but it was empty", GenericScrtCaCert)
}
if hasKeys(scrt.Data, TLSSecretCaCert) {
return nil, fmt.Errorf("found key %q, but it was empty", TLSSecretCaCert)
}
found := truncatedKeysMessage(scrt.Data)
return nil, fmt.Errorf("found secret, but didn't have expected keys %s or %s; found: %s",
GenericScrtCaCert, TLSSecretCaCert, found)
}
func (s *CredentialsController) AddEventHandler(h func(name string, namespace string)) {
// register handler before informer starts
s.secretInformer.AddEventHandler(controllers.ObjectHandler(func(o controllers.Object) {
h(o.GetName(), o.GetNamespace())
}))
}