blob: a6d8e35d238e30e73939103104ccb380dd379049 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package bootstrap
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/adsc"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
configaggregate "github.com/apache/dubbo-kubernetes/sail/pkg/config/aggregate"
"github.com/apache/dubbo-kubernetes/sail/pkg/config/kube/crdclient"
"github.com/apache/dubbo-kubernetes/sail/pkg/config/kube/file"
"github.com/apache/dubbo-kubernetes/sail/pkg/config/memory"
dubboCredentials "github.com/apache/dubbo-kubernetes/sail/pkg/credentials"
"github.com/apache/dubbo-kubernetes/sail/pkg/credentials/kube"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"istio.io/api/networking/v1alpha3"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"net/url"
"strings"
)
type ConfigSourceAddressScheme string
const (
File ConfigSourceAddressScheme = "fs"
XDS ConfigSourceAddressScheme = "xds"
Kubernetes ConfigSourceAddressScheme = "k8s"
)
func (s *Server) initConfigController(args *SailArgs) error {
meshConfig := s.environment.Mesh()
if len(meshConfig.ConfigSources) > 0 {
// Using MCP for config.
if err := s.initConfigSources(args); err != nil {
return err
}
} else if args.RegistryOptions.FileDir != "" {
// Local files - should be added even if other options are specified
configController, err := file.NewController(
args.RegistryOptions.FileDir,
args.RegistryOptions.KubeOptions.DomainSuffix,
collections.Sail,
args.RegistryOptions.KubeOptions,
)
if err != nil {
return err
}
s.ConfigStores = append(s.ConfigStores, configController)
} else {
err := s.initK8SConfigStore(args)
if err != nil {
return err
}
}
// TODO ingress controller
// TODO addTerminatingStartFunc
// Wrap the config controller with a cache.
aggregateConfigController, err := configaggregate.MakeCache(s.ConfigStores)
if err != nil {
return err
}
s.configController = aggregateConfigController
// Create the config store.
s.environment.ConfigStore = aggregateConfigController
// Defer starting the controller until after the service is created.
s.addStartFunc("config controller", func(stop <-chan struct{}) error {
go s.configController.Run(stop)
return nil
})
return nil
}
// initConfigSources will process mesh config 'configSources' and initialize
// associated configs.
func (s *Server) initConfigSources(args *SailArgs) (err error) {
for _, configSource := range s.environment.Mesh().ConfigSources {
srcAddress, err := url.Parse(configSource.Address)
if err != nil {
return fmt.Errorf("invalid config URL %s %v", configSource.Address, err)
}
scheme := ConfigSourceAddressScheme(srcAddress.Scheme)
switch scheme {
case File:
if srcAddress.Path == "" {
return fmt.Errorf("invalid fs config URL %s, contains no file path", configSource.Address)
}
configController, err := file.NewController(
srcAddress.Path,
args.RegistryOptions.KubeOptions.DomainSuffix,
collections.Sail,
args.RegistryOptions.KubeOptions,
)
if err != nil {
return err
}
s.ConfigStores = append(s.ConfigStores, configController)
klog.Infof("Started File configSource %s", configSource.Address)
case XDS:
transportCredentials, err := s.getTransportCredentials(args, configSource.TlsSettings)
if err != nil {
return fmt.Errorf("failed to read transport credentials from config: %v", err)
}
xdsMCP, err := adsc.New(srcAddress.Host, &adsc.ADSConfig{
InitialDiscoveryRequests: adsc.ConfigInitialRequests(),
Config: adsc.Config{
Namespace: args.Namespace,
Workload: args.PodName,
Revision: "", // TODO
Meta: nil,
GrpcOpts: []grpc.DialOption{
args.KeepaliveOptions.ConvertToClientOption(),
grpc.WithTransportCredentials(transportCredentials),
},
},
})
if err != nil {
return fmt.Errorf("failed to dial XDS %s %v", configSource.Address, err)
}
store := memory.Make(collections.Sail)
// TODO: enable namespace filter for memory controller
configController := memory.NewController(store)
configController.RegisterHasSyncedHandler(xdsMCP.HasSynced)
xdsMCP.Store = configController
err = xdsMCP.Run()
if err != nil {
return fmt.Errorf("MCP: failed running %v", err)
}
s.ConfigStores = append(s.ConfigStores, configController)
klog.Infof("Started XDS configSource %s", configSource.Address)
case Kubernetes:
if srcAddress.Path == "" || srcAddress.Path == "/" {
err2 := s.initK8SConfigStore(args)
if err2 != nil {
klog.Errorf("Error loading k8s: %v", err2)
return err2
}
klog.Infof("Started Kubernetes configSource %s", configSource.Address)
} else {
klog.Infof("Not implemented, ignore: %v", configSource.Address)
// TODO: handle k8s:// scheme for remote cluster. Use same mechanism as service registry,
// using the cluster name as key to match a secret.
}
default:
klog.Infof("Ignoring unsupported config source: %v", configSource.Address)
}
}
return nil
}
func (s *Server) makeKubeConfigController(args *SailArgs) *crdclient.Client {
opts := crdclient.Option{
DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix,
Identifier: "crd-controller",
KrtDebugger: args.KrtDebugger,
}
schemas := collections.Sail
return crdclient.NewForSchemas(s.kubeClient, opts, schemas)
}
func (s *Server) initK8SConfigStore(args *SailArgs) error {
if s.kubeClient == nil {
return nil
}
configController := s.makeKubeConfigController(args)
s.ConfigStores = append(s.ConfigStores, configController)
s.XDSServer.ConfigUpdate(&model.PushRequest{
Full: true,
Reason: model.NewReasonStats(model.GlobalUpdate),
Forced: true,
})
return nil
}
// getTransportCredentials attempts to create credentials.TransportCredentials from ClientTLSSettings in mesh config
// Implemented only for SIMPLE_TLS mode
// TODO:
//
// Implement for MUTUAL_TLS/DUBBO_MUTUAL_TLS modes
func (s *Server) getTransportCredentials(args *SailArgs, tlsSettings *v1alpha3.ClientTLSSettings) (credentials.TransportCredentials, error) {
// TODO ValidateTLS
switch tlsSettings.GetMode() {
case v1alpha3.ClientTLSSettings_SIMPLE:
if len(tlsSettings.GetCredentialName()) > 0 {
rootCert, err := s.getRootCertFromSecret(tlsSettings.GetCredentialName(), args.Namespace)
if err != nil {
return nil, err
}
tlsSettings.CaCertificates = string(rootCert.Cert)
tlsSettings.CaCrl = string(rootCert.CRL)
}
if tlsSettings.GetInsecureSkipVerify().GetValue() || len(tlsSettings.GetCaCertificates()) == 0 {
return credentials.NewTLS(&tls.Config{
ServerName: tlsSettings.GetSni(),
InsecureSkipVerify: tlsSettings.GetInsecureSkipVerify().GetValue(), // nolint
}), nil
}
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM([]byte(tlsSettings.GetCaCertificates())) {
return nil, fmt.Errorf("failed to add ca certificate from configSource.tlsSettings to pool")
}
return credentials.NewTLS(&tls.Config{
ServerName: tlsSettings.GetSni(),
InsecureSkipVerify: tlsSettings.GetInsecureSkipVerify().GetValue(), // nolint
RootCAs: certPool,
VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
return s.verifyCert(rawCerts, tlsSettings)
},
}), nil
default:
return insecure.NewCredentials(), nil
}
}
// verifyCert verifies given cert against TLS settings like SANs and CRL.
func (s *Server) verifyCert(certs [][]byte, tlsSettings *v1alpha3.ClientTLSSettings) error {
if len(certs) == 0 {
return fmt.Errorf("no certificates provided")
}
cert, err := x509.ParseCertificate(certs[0])
if err != nil {
return fmt.Errorf("failed to parse certificate: %w", err)
}
if len(tlsSettings.SubjectAltNames) > 0 {
sanMatchFound := false
for _, san := range cert.DNSNames {
if sanMatchFound {
break
}
for _, name := range tlsSettings.SubjectAltNames {
if san == name {
sanMatchFound = true
break
}
}
}
if !sanMatchFound {
return fmt.Errorf("no matching SAN found")
}
}
if len(tlsSettings.CaCrl) > 0 {
crlData := []byte(strings.TrimSpace(tlsSettings.CaCrl))
block, _ := pem.Decode(crlData)
if block != nil {
crlData = block.Bytes
}
crl, err := x509.ParseRevocationList(crlData)
if err != nil {
return fmt.Errorf("failed to parse CRL: %w", err)
}
for _, revokedCert := range crl.RevokedCertificateEntries {
if cert.SerialNumber.Cmp(revokedCert.SerialNumber) == 0 {
return fmt.Errorf("certificate is revoked")
}
}
}
return nil
}
// getRootCertFromSecret fetches a map of keys and values from a secret with name in namespace
func (s *Server) getRootCertFromSecret(name, namespace string) (*dubboCredentials.CertInfo, error) {
secret, err := s.kubeClient.Kube().CoreV1().Secrets(namespace).Get(context.Background(), name, v1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get credential with name %v: %v", name, err)
}
return kube.ExtractRoot(secret.Data)
}