blob: b227959118bd1dd74b1d1abc0c764312b36ca7e0 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package sds implements secret discovery service in NodeAgent.
package sds
import (
"context"
"fmt"
"time"
)
import (
"github.com/cenkalti/backoff/v4"
cryptomb "github.com/envoyproxy/go-control-plane/contrib/envoy/extensions/private_key_providers/cryptomb/v3alpha"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
tls "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
sds "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
mesh "istio.io/api/mesh/v1alpha1"
"istio.io/pkg/log"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/util"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/xds"
v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk"
"github.com/apache/dubbo-go-pixiu/pkg/security"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
)
var sdsServiceLog = log.RegisterScope("sds", "SDS service debugging", 0)
type sdsservice struct {
st security.SecretManager
XdsServer *xds.DiscoveryServer
stop chan struct{}
rootCaPath string
pkpConf *mesh.PrivateKeyProvider
}
// Assert we implement the generator interface
var _ model.XdsResourceGenerator = &sdsservice{}
func NewXdsServer(stop chan struct{}, gen model.XdsResourceGenerator) *xds.DiscoveryServer {
s := xds.NewXDS(stop)
s.DiscoveryServer.Generators = map[string]model.XdsResourceGenerator{
v3.SecretType: gen,
}
s.DiscoveryServer.ProxyNeedsPush = func(proxy *model.Proxy, req *model.PushRequest) bool {
// Empty changes means "all"
if len(req.ConfigsUpdated) == 0 {
return true
}
var resources []string
proxy.RLock()
if proxy.WatchedResources[v3.SecretType] != nil {
resources = proxy.WatchedResources[v3.SecretType].ResourceNames
}
proxy.RUnlock()
if resources == nil {
return false
}
names := sets.New(resources...)
found := false
for name := range model.ConfigsOfKind(req.ConfigsUpdated, gvk.Secret) {
if names.Contains(name.Name) {
found = true
break
}
}
return found
}
s.DiscoveryServer.Start(stop)
return s.DiscoveryServer
}
// newSDSService creates Secret Discovery Service which implements envoy SDS API.
func newSDSService(st security.SecretManager, options *security.Options, pkpConf *mesh.PrivateKeyProvider) *sdsservice {
ret := &sdsservice{
st: st,
stop: make(chan struct{}),
pkpConf: pkpConf,
}
ret.XdsServer = NewXdsServer(ret.stop, ret)
ret.rootCaPath = options.CARootPath
if options.FileMountedCerts {
return ret
}
// Pre-generate workload certificates to improve startup latency and ensure that for OUTPUT_CERTS
// case we always write a certificate. A workload can technically run without any mTLS/CA
// configured, in which case this will fail; if it becomes noisy we should disable the entire SDS
// server in these cases.
go func() {
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = 0
for {
_, err := st.GenerateSecret(security.WorkloadKeyCertResourceName)
if err == nil {
break
}
sdsServiceLog.Warnf("failed to warm certificate: %v", err)
select {
case <-ret.stop:
return
case <-time.After(b.NextBackOff()):
}
}
for {
_, err := st.GenerateSecret(security.RootCertReqResourceName)
if err == nil {
break
}
sdsServiceLog.Warnf("failed to warm root certificate: %v", err)
select {
case <-ret.stop:
return
case <-time.After(b.NextBackOff()):
}
}
}()
return ret
}
func (s *sdsservice) generate(resourceNames []string) (model.Resources, error) {
resources := model.Resources{}
for _, resourceName := range resourceNames {
secret, err := s.st.GenerateSecret(resourceName)
if err != nil {
// Typically, in Istiod, we do not return an error for a failure to generate a resource
// However, here it makes sense, because we are generally streaming a single resource,
// so sending an error will not cause a single failure to prevent the entire multiplex stream
// of resources, and failures here are generally due to temporary networking issues to the CA
// rather than a result of configuration issues, which trigger updates in Istiod when resolved.
// Instead, we rely on the client to retry (with backoff) on failures.
return nil, fmt.Errorf("failed to generate secret for %v: %v", resourceName, err)
}
res := util.MessageToAny(toEnvoySecret(secret, s.rootCaPath, s.pkpConf))
resources = append(resources, &discovery.Resource{
Name: resourceName,
Resource: res,
})
}
return resources, nil
}
// Generate implements the XDS Generator interface. This allows the XDS server to dispatch requests
// for SecretTypeV3 to our server to generate the Envoy response.
func (s *sdsservice) Generate(proxy *model.Proxy, w *model.WatchedResource, updates *model.PushRequest) (model.Resources, model.XdsLogDetails, error) {
// updates.Full indicates we should do a complete push of all updated resources
// In practice, all pushes should be incremental (ie, if the `default` cert changes we won't push
// all file certs).
if updates.Full {
resp, err := s.generate(w.ResourceNames)
return resp, pushLog(w.ResourceNames), err
}
names := []string{}
watched := sets.New(w.ResourceNames...)
for i := range updates.ConfigsUpdated {
if i.Kind == gvk.Secret && watched.Contains(i.Name) {
names = append(names, i.Name)
}
}
resp, err := s.generate(names)
return resp, pushLog(names), err
}
// register adds the SDS handle to the grpc server
func (s *sdsservice) register(rpcs *grpc.Server) {
sds.RegisterSecretDiscoveryServiceServer(rpcs, s)
}
// StreamSecrets serves SDS discovery requests and SDS push requests
func (s *sdsservice) StreamSecrets(stream sds.SecretDiscoveryService_StreamSecretsServer) error {
return s.XdsServer.Stream(stream)
}
func (s *sdsservice) DeltaSecrets(stream sds.SecretDiscoveryService_DeltaSecretsServer) error {
return status.Error(codes.Unimplemented, "DeltaSecrets not implemented")
}
func (s *sdsservice) FetchSecrets(ctx context.Context, discReq *discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) {
return nil, status.Error(codes.Unimplemented, "FetchSecrets not implemented")
}
func (s *sdsservice) Close() {
close(s.stop)
s.XdsServer.Shutdown()
}
// toEnvoySecret converts a security.SecretItem to an Envoy tls.Secret
func toEnvoySecret(s *security.SecretItem, caRootPath string, pkpConf *mesh.PrivateKeyProvider) *tls.Secret {
secret := &tls.Secret{
Name: s.ResourceName,
}
var cfg security.SdsCertificateConfig
ok := false
if s.ResourceName == security.FileRootSystemCACert {
cfg, ok = security.SdsCertificateConfigFromResourceNameForOSCACert(caRootPath)
} else {
cfg, ok = security.SdsCertificateConfigFromResourceName(s.ResourceName)
}
if s.ResourceName == security.RootCertReqResourceName || (ok && cfg.IsRootCertificate()) {
secret.Type = &tls.Secret_ValidationContext{
ValidationContext: &tls.CertificateValidationContext{
TrustedCa: &core.DataSource{
Specifier: &core.DataSource_InlineBytes{
InlineBytes: s.RootCert,
},
},
},
}
} else {
switch pkpConf.GetProvider().(type) {
case *mesh.PrivateKeyProvider_Cryptomb:
crypto := pkpConf.GetCryptomb()
msg := util.MessageToAny(&cryptomb.CryptoMbPrivateKeyMethodConfig{
PollDelay: durationpb.New(time.Duration(crypto.GetPollDelay().Nanos)),
PrivateKey: &core.DataSource{
Specifier: &core.DataSource_InlineBytes{
InlineBytes: s.PrivateKey,
},
},
})
secret.Type = &tls.Secret_TlsCertificate{
TlsCertificate: &tls.TlsCertificate{
CertificateChain: &core.DataSource{
Specifier: &core.DataSource_InlineBytes{
InlineBytes: s.CertificateChain,
},
},
PrivateKeyProvider: &tls.PrivateKeyProvider{
ProviderName: "cryptomb",
ConfigType: &tls.PrivateKeyProvider_TypedConfig{
TypedConfig: msg,
},
},
},
}
default:
secret.Type = &tls.Secret_TlsCertificate{
TlsCertificate: &tls.TlsCertificate{
CertificateChain: &core.DataSource{
Specifier: &core.DataSource_InlineBytes{
InlineBytes: s.CertificateChain,
},
},
PrivateKey: &core.DataSource{
Specifier: &core.DataSource_InlineBytes{
InlineBytes: s.PrivateKey,
},
},
},
}
}
}
return secret
}
func pushLog(names []string) model.XdsLogDetails {
if len(names) == 1 {
// For common case of single resource, show which resource it was
return model.XdsLogDetails{AdditionalInfo: "resource:" + names[0]}
}
return model.DefaultXdsLogDetails
}