blob: d3e3cdbbfa062cc8621565eb3a3d55185f5fb840 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package istioagent
import (
"context"
"errors"
"fmt"
"io"
"math/rand"
"net"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"
)
import (
"github.com/cenkalti/backoff/v4"
bootstrapv3 "github.com/envoyproxy/go-control-plane/envoy/config/bootstrap/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/golang/protobuf/proto" // nolint: staticcheck
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
mesh "istio.io/api/mesh/v1alpha1"
"istio.io/pkg/filewatcher"
"istio.io/pkg/log"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/cmd/pilot-agent/config"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3"
"github.com/apache/dubbo-go-pixiu/pkg/bootstrap"
"github.com/apache/dubbo-go-pixiu/pkg/bootstrap/platform"
"github.com/apache/dubbo-go-pixiu/pkg/config/constants"
dnsClient "github.com/apache/dubbo-go-pixiu/pkg/dns/client"
dnsProto "github.com/apache/dubbo-go-pixiu/pkg/dns/proto"
"github.com/apache/dubbo-go-pixiu/pkg/envoy"
"github.com/apache/dubbo-go-pixiu/pkg/istio-agent/grpcxds"
"github.com/apache/dubbo-go-pixiu/pkg/security"
"github.com/apache/dubbo-go-pixiu/pkg/util/protomarshal"
"github.com/apache/dubbo-go-pixiu/security/pkg/nodeagent/cache"
"github.com/apache/dubbo-go-pixiu/security/pkg/nodeagent/caclient"
citadel "github.com/apache/dubbo-go-pixiu/security/pkg/nodeagent/caclient/providers/citadel"
gca "github.com/apache/dubbo-go-pixiu/security/pkg/nodeagent/caclient/providers/google"
cas "github.com/apache/dubbo-go-pixiu/security/pkg/nodeagent/caclient/providers/google-cas"
"github.com/apache/dubbo-go-pixiu/security/pkg/nodeagent/sds"
)
// To debug:
// curl -X POST localhost:15000/logging?config=trace - to see SendingDiscoveryRequest
// Breakpoints in secretcache.go GenerateSecret..
// Note that istiod currently can't validate the JWT token unless it runs on k8s
// Main problem is the JWT validation check which hardcodes the k8s server address and token location.
//
// To test on a local machine, for debugging:
//
// kis exec $POD -- cat /run/secrets/istio-token/istio-token > var/run/secrets/tokens/istio-token
// kis port-forward $POD 15010:15010 &
//
// You can also copy the K8S CA and a token to be used to connect to k8s - but will need removing the hardcoded addr
// kis exec $POD -- cat /run/secrets/kubernetes.io/serviceaccount/{ca.crt,token} > var/run/secrets/kubernetes.io/serviceaccount/
//
// Or disable the jwt validation while debugging SDS problems.
const (
// Location of K8S CA root.
k8sCAPath = "./var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
// CitadelCACertPath is the directory for Citadel CA certificate.
// This is mounted from config map 'istio-ca-root-cert'. Part of startup,
// this may be replaced with ./etc/certs, if a root-cert.pem is found, to
// handle secrets mounted from non-citadel CAs.
CitadelCACertPath = "./var/run/secrets/istio"
)
const (
MetadataClientCertKey = "ISTIO_META_TLS_CLIENT_KEY"
MetadataClientCertChain = "ISTIO_META_TLS_CLIENT_CERT_CHAIN"
MetadataClientRootCert = "ISTIO_META_TLS_CLIENT_ROOT_CERT"
)
// Agent contains the configuration of the agent, based on the injected
// environment:
// - SDS hostPath if node-agent was used
// - /etc/certs/key if Citadel or other mounted Secrets are used
// - root cert to use for connecting to XDS server
// - CA address, with proper defaults and detection
type Agent struct {
proxyConfig *mesh.ProxyConfig
cfg *AgentOptions
secOpts *security.Options
envoyOpts envoy.ProxyConfig
envoyAgent *envoy.Agent
envoyWaitCh chan error
sdsServer *sds.Server
secretCache *cache.SecretManagerClient
// Used when proxying envoy xds via istio-agent is enabled.
xdsProxy *XdsProxy
caFileWatcher filewatcher.FileWatcher
// local DNS Server that processes DNS requests locally and forwards to upstream DNS if needed.
localDNSServer *dnsClient.LocalDNSServer
// Signals true completion (e.g. with delayed graceful termination of Envoy)
wg sync.WaitGroup
}
// AgentOptions contains additional config for the agent, not included in ProxyConfig.
// Most are from env variables ( still experimental ) or for testing only.
// Eventually most non-test settings should graduate to ProxyConfig
// Please don't add 100 parameters to the NewAgent function (or any other)!
type AgentOptions struct {
// ProxyXDSDebugViaAgent if true will listen on 15004 and forward queries
// to XDS istio.io/debug. (Requires ProxyXDSViaAgent).
ProxyXDSDebugViaAgent bool
// Port value for the debugging endpoint.
ProxyXDSDebugViaAgentPort int
// DNSCapture indicates if the XDS proxy has dns capture enabled or not
// This option will not be considered if proxyXDSViaAgent is false.
DNSCapture bool
// DNSAddr is the DNS capture address
DNSAddr string
// ProxyType is the type of proxy we are configured to handle
ProxyType model.NodeType
// ProxyNamespace to use for local dns resolution
ProxyNamespace string
// ProxyDomain is the DNS domain associated with the proxy (assumed
// to include the namespace as well) (for local dns resolution)
ProxyDomain string
// Node identifier used by Envoy
ServiceNode string
// XDSRootCerts is the location of the root CA for the XDS connection. Used for setting platform certs or
// using custom roots.
XDSRootCerts string
// CARootCerts of the location of the root CA for the CA connection. Used for setting platform certs or
// using custom roots.
CARootCerts string
// Extra headers to add to the XDS connection.
XDSHeaders map[string]string
// Is the proxy an IPv6 proxy
IsIPv6 bool
// Path to local UDS to communicate with Envoy
XdsUdsPath string
// Ability to retrieve ProxyConfig dynamically through XDS
EnableDynamicProxyConfig bool
// All of the proxy's IP Addresses
ProxyIPAddresses []string
// Enables dynamic generation of bootstrap.
EnableDynamicBootstrap bool
// Envoy status port (that circles back to the agent status port). Really belongs to the proxy config.
// Cannot be eradicated because mistakes have been made.
EnvoyStatusPort int
// Envoy prometheus port that circles back to its admin port for prom endpoint. Really belongs to the
// proxy config.
EnvoyPrometheusPort int
MinimumDrainDuration time.Duration
ExitOnZeroActiveConnections bool
// Cloud platform
Platform platform.Environment
// GRPCBootstrapPath if set will generate a file compatible with GRPC_XDS_BOOTSTRAP
GRPCBootstrapPath string
// Disables all envoy agent features
DisableEnvoy bool
DownstreamGrpcOptions []grpc.ServerOption
IstiodSAN string
WASMInsecureRegistries []string
}
// NewAgent hosts the functionality for local SDS and XDS. This consists of the local SDS server and
// associated clients to sign certificates (when not using files), and the local XDS proxy (including
// health checking for VMs and DNS proxying).
func NewAgent(proxyConfig *mesh.ProxyConfig, agentOpts *AgentOptions, sopts *security.Options, eopts envoy.ProxyConfig) *Agent {
return &Agent{
proxyConfig: proxyConfig,
cfg: agentOpts,
secOpts: sopts,
envoyOpts: eopts,
caFileWatcher: filewatcher.NewWatcher(),
}
}
// EnvoyDisabled if true indicates calling Run will not run and wait for Envoy.
func (a *Agent) EnvoyDisabled() bool {
return a.envoyOpts.TestOnly || a.cfg.DisableEnvoy
}
// WaitForSigterm if true indicates calling Run will block until SIGTERM or SIGNT is received.
func (a *Agent) WaitForSigterm() bool {
return a.EnvoyDisabled() && !a.envoyOpts.TestOnly
}
func (a *Agent) generateNodeMetadata() (*model.Node, error) {
provCert, err := a.FindRootCAForXDS()
if err != nil {
return nil, fmt.Errorf("failed to find root CA cert for XDS: %v", err)
}
if provCert == "" {
// Envoy only supports load from file. If we want to use system certs, use best guess
// To be more correct this could lookup all the "well known" paths but this is extremely \
// unlikely to run on a non-debian based machine, and if it is it can be explicitly configured
provCert = "/etc/ssl/certs/ca-certificates.crt"
}
var pilotSAN []string
if a.proxyConfig.ControlPlaneAuthPolicy == mesh.AuthenticationPolicy_MUTUAL_TLS {
// Obtain Pilot SAN, using DNS.
pilotSAN = []string{config.GetPilotSan(a.proxyConfig.DiscoveryAddress)}
}
return bootstrap.GetNodeMetaData(bootstrap.MetadataOptions{
ID: a.cfg.ServiceNode,
Envs: os.Environ(),
Platform: a.cfg.Platform,
InstanceIPs: a.cfg.ProxyIPAddresses,
StsPort: a.secOpts.STSPort,
ProxyConfig: a.proxyConfig,
PilotSubjectAltName: pilotSAN,
OutlierLogPath: a.envoyOpts.OutlierLogPath,
ProvCert: provCert,
EnvoyPrometheusPort: a.cfg.EnvoyPrometheusPort,
EnvoyStatusPort: a.cfg.EnvoyStatusPort,
ExitOnZeroActiveConnections: a.cfg.ExitOnZeroActiveConnections,
XDSRootCert: a.cfg.XDSRootCerts,
})
}
func (a *Agent) initializeEnvoyAgent(ctx context.Context) error {
node, err := a.generateNodeMetadata()
if err != nil {
return fmt.Errorf("failed to generate bootstrap metadata: %v", err)
}
log.Infof("Pilot SAN: %v", node.Metadata.PilotSubjectAltName)
// Note: the cert checking still works, the generated file is updated if certs are changed.
// We just don't save the generated file, but use a custom one instead. Pilot will keep
// monitoring the certs and restart if the content of the certs changes.
if len(a.proxyConfig.CustomConfigFile) > 0 {
// there is a custom configuration. Don't write our own config - but keep watching the certs.
a.envoyOpts.ConfigPath = a.proxyConfig.CustomConfigFile
a.envoyOpts.ConfigCleanup = false
} else {
out, err := bootstrap.New(bootstrap.Config{
Node: node,
}).CreateFileForEpoch(0)
if err != nil {
return fmt.Errorf("failed to generate bootstrap config: %v", err)
}
a.envoyOpts.ConfigPath = out
a.envoyOpts.ConfigCleanup = true
}
// Back-fill envoy options from proxy config options
a.envoyOpts.BinaryPath = a.proxyConfig.BinaryPath
a.envoyOpts.AdminPort = a.proxyConfig.ProxyAdminPort
a.envoyOpts.DrainDuration = a.proxyConfig.DrainDuration
a.envoyOpts.ParentShutdownDuration = a.proxyConfig.ParentShutdownDuration
a.envoyOpts.Concurrency = a.proxyConfig.Concurrency.GetValue()
// Checking only uid should be sufficient - but tests also run as root and
// will break due to permission errors if we start envoy as 1337.
// This is a mode used for permission-less docker, where iptables can't be
// used.
a.envoyOpts.AgentIsRoot = os.Getuid() == 0 && strings.HasSuffix(a.cfg.DNSAddr, ":53")
envoyProxy := envoy.NewProxy(a.envoyOpts)
drainDuration := a.proxyConfig.TerminationDrainDuration.AsDuration()
localHostAddr := localHostIPv4
if a.cfg.IsIPv6 {
localHostAddr = localHostIPv6
}
a.envoyAgent = envoy.NewAgent(envoyProxy, drainDuration, a.cfg.MinimumDrainDuration, localHostAddr,
int(a.proxyConfig.ProxyAdminPort), a.cfg.EnvoyStatusPort, a.cfg.EnvoyPrometheusPort, a.cfg.ExitOnZeroActiveConnections)
a.envoyWaitCh = make(chan error, 1)
if a.cfg.EnableDynamicBootstrap {
// Simulate an xDS request for a bootstrap
a.wg.Add(1)
go func() {
defer a.wg.Done()
// wait indefinitely and keep retrying with jittered exponential backoff
backoff := 500
max := 30000
retries:
for {
// handleStream hands on to request after exit, so create a fresh one instead.
request := &bootstrapDiscoveryRequest{
node: node,
envoyWaitCh: a.envoyWaitCh,
envoyUpdate: envoyProxy.UpdateConfig,
}
_ = a.xdsProxy.handleStream(request)
select {
case <-a.envoyWaitCh:
break retries
default:
}
delay := time.Duration(rand.Int()%backoff) * time.Millisecond
log.Infof("retrying bootstrap discovery request with backoff: %v", delay)
select {
case <-ctx.Done():
break retries
case <-time.After(delay):
}
if backoff < max/2 {
backoff *= 2
} else {
backoff = max
}
}
}()
} else {
close(a.envoyWaitCh)
}
return nil
}
type bootstrapDiscoveryRequest struct {
node *model.Node
envoyWaitCh chan error
envoyUpdate func(data []byte) error
sent bool
received bool
}
// Send refers to a request from the xDS proxy.
func (b *bootstrapDiscoveryRequest) Send(resp *discovery.DiscoveryResponse) error {
if resp.TypeUrl == v3.BootstrapType && !b.received {
b.received = true
if len(resp.Resources) != 1 {
b.envoyWaitCh <- fmt.Errorf("unexpected number of bootstraps: %d", len(resp.Resources))
return nil
}
var bs bootstrapv3.Bootstrap
if err := resp.Resources[0].UnmarshalTo(&bs); err != nil {
b.envoyWaitCh <- fmt.Errorf("failed to unmarshal bootstrap: %v", err)
return nil
}
by, err := protomarshal.MarshalIndent(&bs, " ")
if err != nil {
b.envoyWaitCh <- fmt.Errorf("failed to marshal bootstrap as JSON: %v", err)
return nil
}
if err := b.envoyUpdate(by); err != nil {
b.envoyWaitCh <- fmt.Errorf("failed to update bootstrap from discovery: %v", err)
return nil
}
close(b.envoyWaitCh)
}
return nil
}
// Recv Receive refers to a request to the xDS proxy.
func (b *bootstrapDiscoveryRequest) Recv() (*discovery.DiscoveryRequest, error) {
if b.sent {
<-b.envoyWaitCh
return nil, io.EOF
}
b.sent = true
return &discovery.DiscoveryRequest{
TypeUrl: v3.BootstrapType,
Node: bootstrap.ConvertNodeToXDSNode(b.node),
}, nil
}
func (b *bootstrapDiscoveryRequest) Context() context.Context { return context.Background() }
// Run is a non-blocking call which returns either an error or a function to await for completion.
func (a *Agent) Run(ctx context.Context) (func(), error) {
var err error
if err = a.initLocalDNSServer(); err != nil {
return nil, fmt.Errorf("failed to start local DNS server: %v", err)
}
socketExists, err := checkSocket(ctx, security.WorkloadIdentitySocketPath)
if err != nil {
return nil, fmt.Errorf("failed to check SDS socket: %v", err)
}
if socketExists {
log.Info("SDS socket found. Istio SDS Server won't be started")
} else {
log.Info("SDS socket not found. Starting Istio SDS Server")
err = a.initSdsServer()
if err != nil {
return nil, fmt.Errorf("failed to start SDS server: %v", err)
}
}
a.xdsProxy, err = initXdsProxy(a)
if err != nil {
return nil, fmt.Errorf("failed to start xds proxy: %v", err)
}
if a.cfg.ProxyXDSDebugViaAgent {
err = a.xdsProxy.initDebugInterface(a.cfg.ProxyXDSDebugViaAgentPort)
if err != nil {
return nil, fmt.Errorf("failed to start istio tap server: %v", err)
}
}
if a.cfg.GRPCBootstrapPath != "" {
if err := a.generateGRPCBootstrap(); err != nil {
return nil, fmt.Errorf("failed generating gRPC XDS bootstrap: %v", err)
}
}
rootCAForXDS, err := a.FindRootCAForXDS()
if err != nil {
return nil, fmt.Errorf("failed to find root XDS CA: %v", err)
}
go a.caFileWatcherHandler(ctx, rootCAForXDS)
if !a.EnvoyDisabled() {
err = a.initializeEnvoyAgent(ctx)
if err != nil {
return nil, fmt.Errorf("failed to start envoy agent: %v", err)
}
a.wg.Add(1)
go func() {
defer a.wg.Done()
if a.cfg.EnableDynamicBootstrap {
start := time.Now()
var err error
select {
case err = <-a.envoyWaitCh:
case <-ctx.Done():
// Early cancellation before envoy started.
return
}
if err != nil {
log.Errorf("failed to write updated envoy bootstrap: %v", err)
return
}
log.Infof("received server-side bootstrap in %v", time.Since(start))
}
// This is a blocking call for graceful termination.
a.envoyAgent.Run(ctx)
}()
} else if a.WaitForSigterm() {
// wait for SIGTERM and perform graceful shutdown
a.wg.Add(1)
go func() {
defer a.wg.Done()
<-ctx.Done()
}()
}
return a.wg.Wait, nil
}
func (a *Agent) initSdsServer() error {
var err error
if security.CheckWorkloadCertificate(security.WorkloadIdentityCertChainPath, security.WorkloadIdentityKeyPath, security.WorkloadIdentityRootCertPath) {
log.Info("workload certificate files detected, creating secret manager without caClient")
a.secOpts.RootCertFilePath = security.WorkloadIdentityRootCertPath
a.secOpts.CertChainFilePath = security.WorkloadIdentityCertChainPath
a.secOpts.KeyFilePath = security.WorkloadIdentityKeyPath
a.secretCache, err = cache.NewSecretManagerClient(nil, a.secOpts)
if err != nil {
return fmt.Errorf("failed to start workload secret manager %v", err)
}
} else {
a.secretCache, err = a.newSecretManager()
if err != nil {
return fmt.Errorf("failed to start workload secret manager %v", err)
}
}
if err != nil {
return fmt.Errorf("failed to start workload secret manager %v", err)
}
if a.cfg.DisableEnvoy {
// For proxyless we don't need an SDS server, but still need the keys and
// we need them refreshed periodically.
//
// This is based on the code from newSDSService, but customized to have explicit rotation.
go func() {
st := a.secretCache
st.RegisterSecretHandler(func(resourceName string) {
// The secret handler is called when a secret should be renewed, after invalidating the cache.
// The handler does not call GenerateSecret - it is a side-effect of the SDS generate() method, which
// is called by sdsServer.OnSecretUpdate, which triggers a push and eventually calls sdsservice.Generate
// TODO: extract the logic to detect expiration time, and use a simpler code to rotate to files.
_, _ = a.getWorkloadCerts(st)
})
_, _ = a.getWorkloadCerts(st)
}()
} else {
pkpConf := a.proxyConfig.GetPrivateKeyProvider()
a.sdsServer = sds.NewServer(a.secOpts, a.secretCache, pkpConf)
a.secretCache.RegisterSecretHandler(a.sdsServer.OnSecretUpdate)
}
return nil
}
// getWorkloadCerts will attempt to get a cert, with infinite exponential backoff
// It will not return until both workload cert and root cert are generated.
//
// TODO: evaluate replacing the STS server with a file data source, to simplify Envoy config
func (a *Agent) getWorkloadCerts(st *cache.SecretManagerClient) (sk *security.SecretItem, err error) {
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = 0
for {
sk, err = st.GenerateSecret(security.WorkloadKeyCertResourceName)
if err == nil {
break
}
log.Warnf("failed to get certificate: %v", err)
time.Sleep(b.NextBackOff())
}
for {
_, err := st.GenerateSecret(security.RootCertReqResourceName)
if err == nil {
break
}
log.Warnf("failed to get root certificate: %v", err)
time.Sleep(b.NextBackOff())
}
return
}
func (a *Agent) caFileWatcherHandler(ctx context.Context, caFile string) {
if err := a.caFileWatcher.Add(caFile); err != nil {
log.Warnf("Failed to add file watcher %s, caFile", caFile)
}
log.Debugf("Add CA file %s watcher", caFile)
for {
select {
case gotEvent := <-a.caFileWatcher.Events(caFile):
log.Debugf("Receive file %s event %v", caFile, gotEvent)
if err := a.xdsProxy.InitIstiodDialOptions(a); err != nil {
log.Warnf("Failed to init xds proxy dial options")
}
case <-ctx.Done():
return
}
}
}
func (a *Agent) initLocalDNSServer() (err error) {
// we don't need dns server on gateways
if a.cfg.DNSCapture && a.cfg.ProxyType == model.SidecarProxy {
if a.localDNSServer, err = dnsClient.NewLocalDNSServer(a.cfg.ProxyNamespace, a.cfg.ProxyDomain, a.cfg.DNSAddr); err != nil {
return err
}
a.localDNSServer.StartDNS()
}
return nil
}
func (a *Agent) generateGRPCBootstrap() error {
// generate metadata
node, err := a.generateNodeMetadata()
if err != nil {
return fmt.Errorf("failed generating node metadata: %v", err)
}
if err := os.MkdirAll(filepath.Dir(a.cfg.GRPCBootstrapPath), 0o700); err != nil {
return err
}
_, err = grpcxds.GenerateBootstrapFile(grpcxds.GenerateBootstrapOptions{
Node: node,
XdsUdsPath: a.cfg.XdsUdsPath,
DiscoveryAddress: a.proxyConfig.DiscoveryAddress,
CertDir: a.secOpts.OutputKeyCertToDir,
}, a.cfg.GRPCBootstrapPath)
if err != nil {
return err
}
return nil
}
func (a *Agent) Check() (err error) {
// we dont need dns server on gateways
if a.cfg.DNSCapture && a.cfg.ProxyType == model.SidecarProxy {
if !a.localDNSServer.IsReady() {
return errors.New("istio DNS capture is turned ON and DNS lookup table is not ready yet")
}
}
return nil
}
func (a *Agent) GetDNSTable() *dnsProto.NameTable {
if a.localDNSServer != nil {
nt := a.localDNSServer.NameTable()
nt = proto.Clone(nt).(*dnsProto.NameTable)
a.localDNSServer.BuildAlternateHosts(nt, func(althosts map[string]struct{}, ipv4 []net.IP, ipv6 []net.IP, _ []string) {
for host := range althosts {
if _, exists := nt.Table[host]; !exists {
adresses := make([]string, len(ipv4)+len(ipv6))
for _, ip := range ipv4 {
adresses = append(adresses, ip.String())
}
for _, ip := range ipv6 {
adresses = append(adresses, ip.String())
}
nt.Table[host] = &dnsProto.NameTable_NameInfo{
Ips: adresses,
Registry: "Kubernetes",
}
}
}
})
return nt
}
return nil
}
func (a *Agent) Close() {
if a.xdsProxy != nil {
a.xdsProxy.close()
}
if a.localDNSServer != nil {
a.localDNSServer.Close()
}
if a.sdsServer != nil {
a.sdsServer.Stop()
}
if a.secretCache != nil {
a.secretCache.Close()
}
if a.caFileWatcher != nil {
_ = a.caFileWatcher.Close()
}
}
// FindRootCAForXDS determines the root CA to be configured in bootstrap file.
// It may be different from the CA for the cert server - which is based on CA_ADDR
// In addition it deals with the case the XDS server is on port 443, expected with a proper cert.
// /etc/ssl/certs/ca-certificates.crt
func (a *Agent) FindRootCAForXDS() (string, error) {
var rootCAPath string
if a.cfg.XDSRootCerts == security.SystemRootCerts {
// Special case input for root cert configuration to use system root certificates
return "", nil
} else if a.cfg.XDSRootCerts != "" {
// Using specific platform certs or custom roots
rootCAPath = a.cfg.XDSRootCerts
} else if fileExists(security.DefaultRootCertFilePath) {
// Old style - mounted cert. This is used for XDS auth only,
// not connecting to CA_ADDR because this mode uses external
// agent (Secret refresh, etc)
return security.DefaultRootCertFilePath, nil
} else if a.secOpts.PilotCertProvider == constants.CertProviderKubernetes {
// Using K8S - this is likely incorrect, may work by accident (https://github.com/istio/istio/issues/22161)
rootCAPath = k8sCAPath
} else if a.secOpts.ProvCert != "" {
// This was never completely correct - PROV_CERT are only intended for auth with CA_ADDR,
// and should not be involved in determining the root CA.
// For VMs, the root cert file used to auth may be populated afterwards.
// Thus, return directly here and skip checking for existence.
return a.secOpts.ProvCert + "/root-cert.pem", nil
} else if a.secOpts.FileMountedCerts {
// FileMountedCerts - Load it from Proxy Metadata.
rootCAPath = a.proxyConfig.ProxyMetadata[MetadataClientRootCert]
} else if a.secOpts.PilotCertProvider == constants.CertProviderNone {
return "", fmt.Errorf("root CA file for XDS required but configured provider as none")
} else {
// PILOT_CERT_PROVIDER - default is istiod
// This is the default - a mounted config map on K8S
rootCAPath = path.Join(CitadelCACertPath, constants.CACertNamespaceConfigMapDataName)
}
// Additional checks for root CA cert existence. Fail early, instead of obscure envoy errors
if fileExists(rootCAPath) {
return rootCAPath, nil
}
return "", fmt.Errorf("root CA file for XDS does not exist %s", rootCAPath)
}
// GetKeyCertsForXDS return the key cert files path for connecting with xds.
func (a *Agent) GetKeyCertsForXDS() (string, string) {
var key, cert string
if a.secOpts.ProvCert != "" {
key, cert = getKeyCertInner(a.secOpts.ProvCert)
} else if a.secOpts.FileMountedCerts {
key = a.proxyConfig.ProxyMetadata[MetadataClientCertKey]
cert = a.proxyConfig.ProxyMetadata[MetadataClientCertChain]
}
return key, cert
}
func fileExists(path string) bool {
if fi, err := os.Stat(path); err == nil && fi.Mode().IsRegular() {
return true
}
return false
}
func socketFileExists(path string) bool {
if fi, err := os.Stat(path); err == nil && !fi.Mode().IsRegular() {
return true
}
return false
}
// Checks whether the socket exists and is responsive.
// If it doesn't exist, returns (false, nil)
// If it exists and is NOT responsive, tries to delete the socket file.
// If it can be deleted, returns (false, nil).
// If it cannot be deleted, returns (false, error).
// Otherwise, returns (true, nil)
func checkSocket(ctx context.Context, socketPath string) (bool, error) {
socketExists := socketFileExists(socketPath)
if !socketExists {
return false, nil
}
err := socketHealthCheck(ctx, socketPath)
if err != nil {
log.Debugf("SDS socket detected but not healthy: %v", err)
err = os.Remove(socketPath)
if err != nil {
return false, fmt.Errorf("existing SDS socket could not be removed: %v", err)
}
return false, nil
}
return true, nil
}
func socketHealthCheck(ctx context.Context, socketPath string) error {
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second))
defer cancel()
conn, err := grpc.DialContext(ctx, fmt.Sprintf("unix:%s", socketPath),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.FailOnNonTempDialError(true),
grpc.WithReturnConnectionError(),
grpc.WithBlock(),
)
if err != nil {
return err
}
conn.Close()
return nil
}
// FindRootCAForCA Find the root CA to use when connecting to the CA (Istiod or external).
func (a *Agent) FindRootCAForCA() (string, error) {
var rootCAPath string
if a.cfg.CARootCerts == security.SystemRootCerts {
return "", nil
} else if a.cfg.CARootCerts != "" {
rootCAPath = a.cfg.CARootCerts
} else if a.secOpts.PilotCertProvider == constants.CertProviderKubernetes {
// Using K8S - this is likely incorrect, may work by accident.
// API is GA.
rootCAPath = k8sCAPath // ./var/run/secrets/kubernetes.io/serviceaccount/ca.crt
} else if a.secOpts.PilotCertProvider == constants.CertProviderCustom {
rootCAPath = security.DefaultRootCertFilePath // ./etc/certs/root-cert.pem
} else if a.secOpts.ProvCert != "" {
// This was never completely correct - PROV_CERT are only intended for auth with CA_ADDR,
// and should not be involved in determining the root CA.
// For VMs, the root cert file used to auth may be populated afterwards.
// Thus, return directly here and skip checking for existence.
return a.secOpts.ProvCert + "/root-cert.pem", nil
} else if a.secOpts.PilotCertProvider == constants.CertProviderNone {
return "", fmt.Errorf("root CA file for CA required but configured provider as none")
} else {
// This is the default - a mounted config map on K8S
rootCAPath = path.Join(CitadelCACertPath, constants.CACertNamespaceConfigMapDataName)
// or: "./var/run/secrets/istio/root-cert.pem"
}
// Additional checks for root CA cert existence.
if fileExists(rootCAPath) {
return rootCAPath, nil
}
return "", fmt.Errorf("root CA file for CA does not exist %s", rootCAPath)
}
// getKeyCertsForXDS return the key cert files path for connecting with CA server.
func (a *Agent) getKeyCertsForCA() (string, string) {
var key, cert string
if a.secOpts.ProvCert != "" {
key, cert = getKeyCertInner(a.secOpts.ProvCert)
}
return key, cert
}
func getKeyCertInner(certPath string) (string, string) {
key := path.Join(certPath, constants.KeyFilename)
cert := path.Join(certPath, constants.CertChainFilename)
return key, cert
}
// newSecretManager creates the SecretManager for workload secrets
func (a *Agent) newSecretManager() (*cache.SecretManagerClient, error) {
// If proxy is using file mounted certs, we do not have to connect to CA.
if a.secOpts.FileMountedCerts {
log.Info("Workload is using file mounted certificates. Skipping connecting to CA")
return cache.NewSecretManagerClient(nil, a.secOpts)
}
log.Infof("CA Endpoint %s, provider %s", a.secOpts.CAEndpoint, a.secOpts.CAProviderName)
// TODO: this should all be packaged in a plugin, possibly with optional compilation.
if a.secOpts.CAProviderName == security.GoogleCAProvider {
// Use a plugin to an external CA - this has direct support for the K8S JWT token
// This is only used if the proper env variables are injected - otherwise the existing Citadel or Istiod will be
// used.
caClient, err := gca.NewGoogleCAClient(a.secOpts.CAEndpoint, true, caclient.NewCATokenProvider(a.secOpts))
if err != nil {
return nil, err
}
return cache.NewSecretManagerClient(caClient, a.secOpts)
} else if a.secOpts.CAProviderName == security.GoogleCASProvider {
// Use a plugin
caClient, err := cas.NewGoogleCASClient(a.secOpts.CAEndpoint,
option.WithGRPCDialOption(grpc.WithPerRPCCredentials(caclient.NewCATokenProvider(a.secOpts))))
if err != nil {
return nil, err
}
return cache.NewSecretManagerClient(caClient, a.secOpts)
}
// Using citadel CA
var tlsOpts *citadel.TLSOptions
var err error
// Special case: if Istiod runs on a secure network, on the default port, don't use TLS
// TODO: may add extra cases or explicit settings - but this is a rare use cases, mostly debugging
if strings.HasSuffix(a.secOpts.CAEndpoint, ":15010") {
log.Warn("Debug mode or IP-secure network")
} else {
tlsOpts = &citadel.TLSOptions{}
tlsOpts.RootCert, err = a.FindRootCAForCA()
if err != nil {
return nil, fmt.Errorf("failed to find root CA cert for CA: %v", err)
}
if tlsOpts.RootCert == "" {
log.Infof("Using CA %s cert with system certs", a.secOpts.CAEndpoint)
} else if _, err := os.Stat(tlsOpts.RootCert); os.IsNotExist(err) {
log.Fatalf("invalid config - %s missing a root certificate %s", a.secOpts.CAEndpoint, tlsOpts.RootCert)
} else {
log.Infof("Using CA %s cert with certs: %s", a.secOpts.CAEndpoint, tlsOpts.RootCert)
}
tlsOpts.Key, tlsOpts.Cert = a.getKeyCertsForCA()
}
// Will use TLS unless the reserved 15010 port is used ( istiod on an ipsec/secure VPC)
// rootCert may be nil - in which case the system roots are used, and the CA is expected to have public key
// Otherwise assume the injection has mounted /etc/certs/root-cert.pem
caClient, err := citadel.NewCitadelClient(a.secOpts, tlsOpts)
if err != nil {
return nil, err
}
return cache.NewSecretManagerClient(caClient, a.secOpts)
}
// GRPCBootstrapPath returns the most recently generated gRPC bootstrap or nil if there is none.
func (a *Agent) GRPCBootstrapPath() string {
return a.cfg.GRPCBootstrapPath
}