blob: 0aa381ed23986a224d25d61b068ed798169b73a3 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package istioagent
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"math"
"net"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
)
import (
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"go.uber.org/atomic"
google_rpc "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
any "google.golang.org/protobuf/types/known/anypb"
meshconfig "istio.io/api/mesh/v1alpha1"
"istio.io/pkg/log"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/cmd/pilot-agent/status/ready"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
istiogrpc "github.com/apache/dubbo-go-pixiu/pilot/pkg/grpc"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/xds"
v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3"
"github.com/apache/dubbo-go-pixiu/pkg/config/constants"
dnsProto "github.com/apache/dubbo-go-pixiu/pkg/dns/proto"
"github.com/apache/dubbo-go-pixiu/pkg/istio-agent/health"
"github.com/apache/dubbo-go-pixiu/pkg/istio-agent/metrics"
istiokeepalive "github.com/apache/dubbo-go-pixiu/pkg/keepalive"
"github.com/apache/dubbo-go-pixiu/pkg/uds"
"github.com/apache/dubbo-go-pixiu/pkg/util/protomarshal"
"github.com/apache/dubbo-go-pixiu/pkg/wasm"
"github.com/apache/dubbo-go-pixiu/security/pkg/nodeagent/caclient"
"github.com/apache/dubbo-go-pixiu/security/pkg/pki/util"
)
const (
defaultClientMaxReceiveMessageSize = math.MaxInt32
defaultInitialConnWindowSize = 1024 * 1024 // default gRPC InitialWindowSize
defaultInitialWindowSize = 1024 * 1024 // default gRPC ConnWindowSize
)
var connectionNumber = atomic.NewUint32(0)
// ResponseHandler handles a XDS response in the agent. These will not be forwarded to Envoy.
// Currently, all handlers function on a single resource per type, so the API only exposes one
// resource.
type ResponseHandler func(resp *any.Any) error
// XDS Proxy proxies all XDS requests from envoy to istiod, in addition to allowing
// subsystems inside the agent to also communicate with either istiod/envoy (eg dns, sds, etc).
// The goal here is to consolidate all xds related connections to istiod/envoy into a
// single tcp connection with multiple gRPC streams.
// TODO: Right now, the workloadSDS server and gatewaySDS servers are still separate
// connections. These need to be consolidated.
// TODO: consolidate/use ADSC struct - a lot of duplication.
type XdsProxy struct {
stopChan chan struct{}
clusterID string
downstreamListener net.Listener
downstreamGrpcServer *grpc.Server
istiodAddress string
istiodDialOptions []grpc.DialOption
optsMutex sync.RWMutex
handlers map[string]ResponseHandler
healthChecker *health.WorkloadHealthChecker
xdsHeaders map[string]string
xdsUdsPath string
proxyAddresses []string
httpTapServer *http.Server
tapMutex sync.RWMutex
tapResponseChannel chan *discovery.DiscoveryResponse
// connected stores the active gRPC stream. The proxy will only have 1 connection at a time
connected *ProxyConnection
initialRequest *discovery.DiscoveryRequest
initialDeltaRequest *discovery.DeltaDiscoveryRequest
connectedMutex sync.RWMutex
// Wasm cache and ecds channel are used to replace wasm remote load with local file.
wasmCache wasm.Cache
// ecds version and nonce uses atomic only to prevent race in testing.
// In reality there should not be race as istiod will only have one
// in flight update for each type of resource.
// TODO(bianpengyuan): this relies on the fact that istiod versions all ECDS resources
// the same in a update response. This needs update to support per resource versioning,
// in case istiod changes its behavior, or a different ECDS server is used.
ecdsLastAckVersion atomic.String
ecdsLastNonce atomic.String
downstreamGrpcOptions []grpc.ServerOption
istiodSAN string
}
var proxyLog = log.RegisterScope("xdsproxy", "XDS Proxy in Istio Agent", 0)
const (
localHostIPv4 = "127.0.0.1"
localHostIPv6 = "[::1]"
)
func initXdsProxy(ia *Agent) (*XdsProxy, error) {
var err error
localHostAddr := localHostIPv4
if ia.cfg.IsIPv6 {
localHostAddr = localHostIPv6
}
var envoyProbe ready.Prober
if !ia.cfg.DisableEnvoy {
envoyProbe = &ready.Probe{
AdminPort: uint16(ia.proxyConfig.ProxyAdminPort),
LocalHostAddr: localHostAddr,
}
}
cache := wasm.NewLocalFileCache(constants.IstioDataDir, wasm.DefaultWasmModulePurgeInterval, wasm.DefaultWasmModuleExpiry, ia.cfg.WASMInsecureRegistries)
proxy := &XdsProxy{
istiodAddress: ia.proxyConfig.DiscoveryAddress,
istiodSAN: ia.cfg.IstiodSAN,
clusterID: ia.secOpts.ClusterID,
handlers: map[string]ResponseHandler{},
stopChan: make(chan struct{}),
healthChecker: health.NewWorkloadHealthChecker(ia.proxyConfig.ReadinessProbe, envoyProbe, ia.cfg.ProxyIPAddresses, ia.cfg.IsIPv6),
xdsHeaders: ia.cfg.XDSHeaders,
xdsUdsPath: ia.cfg.XdsUdsPath,
wasmCache: cache,
proxyAddresses: ia.cfg.ProxyIPAddresses,
downstreamGrpcOptions: ia.cfg.DownstreamGrpcOptions,
}
if ia.localDNSServer != nil {
proxy.handlers[v3.NameTableType] = func(resp *any.Any) error {
var nt dnsProto.NameTable
if err := resp.UnmarshalTo(&nt); err != nil {
log.Errorf("failed to unmarshal name table: %v", err)
return err
}
ia.localDNSServer.UpdateLookupTable(&nt)
return nil
}
}
if ia.cfg.EnableDynamicProxyConfig && ia.secretCache != nil {
proxy.handlers[v3.ProxyConfigType] = func(resp *any.Any) error {
pc := &meshconfig.ProxyConfig{}
if err := resp.UnmarshalTo(pc); err != nil {
log.Errorf("failed to unmarshal proxy config: %v", err)
return err
}
caCerts := pc.GetCaCertificatesPem()
log.Debugf("received new certificates to add to mesh trust domain: %v", caCerts)
trustBundle := []byte{}
for _, cert := range caCerts {
trustBundle = util.AppendCertByte(trustBundle, []byte(cert))
}
return ia.secretCache.UpdateConfigTrustBundle(trustBundle)
}
}
proxyLog.Infof("Initializing with upstream address %q and cluster %q", proxy.istiodAddress, proxy.clusterID)
if err = proxy.initDownstreamServer(); err != nil {
return nil, err
}
if err = proxy.InitIstiodDialOptions(ia); err != nil {
return nil, err
}
go func() {
if err := proxy.downstreamGrpcServer.Serve(proxy.downstreamListener); err != nil {
log.Errorf("failed to accept downstream gRPC connection %v", err)
}
}()
go proxy.healthChecker.PerformApplicationHealthCheck(func(healthEvent *health.ProbeEvent) {
// Store the same response as Delta and SotW. Depending on how Envoy connects we will use one or the other.
var req *discovery.DiscoveryRequest
if healthEvent.Healthy {
req = &discovery.DiscoveryRequest{TypeUrl: v3.HealthInfoType}
} else {
req = &discovery.DiscoveryRequest{
TypeUrl: v3.HealthInfoType,
ErrorDetail: &google_rpc.Status{
Code: int32(codes.Internal),
Message: healthEvent.UnhealthyMessage,
},
}
}
proxy.PersistRequest(req)
var deltaReq *discovery.DeltaDiscoveryRequest
if healthEvent.Healthy {
deltaReq = &discovery.DeltaDiscoveryRequest{TypeUrl: v3.HealthInfoType}
} else {
deltaReq = &discovery.DeltaDiscoveryRequest{
TypeUrl: v3.HealthInfoType,
ErrorDetail: &google_rpc.Status{
Code: int32(codes.Internal),
Message: healthEvent.UnhealthyMessage,
},
}
}
proxy.PersistDeltaRequest(deltaReq)
}, proxy.stopChan)
return proxy, nil
}
// PersistRequest sends a request to the currently connected proxy. Additionally, on any reconnection
// to the upstream XDS request we will resend this request.
func (p *XdsProxy) PersistRequest(req *discovery.DiscoveryRequest) {
var ch chan *discovery.DiscoveryRequest
var stop chan struct{}
p.connectedMutex.Lock()
if p.connected != nil {
ch = p.connected.requestsChan
stop = p.connected.stopChan
}
p.initialRequest = req
p.connectedMutex.Unlock()
// Immediately send if we are currently connect
if ch != nil {
select {
case ch <- req:
case <-stop:
}
}
}
func (p *XdsProxy) UnregisterStream(c *ProxyConnection) {
p.connectedMutex.Lock()
defer p.connectedMutex.Unlock()
if p.connected != nil && p.connected == c {
close(p.connected.stopChan)
p.connected = nil
}
}
func (p *XdsProxy) RegisterStream(c *ProxyConnection) {
p.connectedMutex.Lock()
defer p.connectedMutex.Unlock()
if p.connected != nil {
proxyLog.Warnf("registered overlapping stream; closing previous")
close(p.connected.stopChan)
}
p.connected = c
}
type ProxyConnection struct {
conID uint32
upstreamError chan error
downstreamError chan error
requestsChan chan *discovery.DiscoveryRequest
responsesChan chan *discovery.DiscoveryResponse
deltaRequestsChan chan *discovery.DeltaDiscoveryRequest
deltaResponsesChan chan *discovery.DeltaDiscoveryResponse
stopChan chan struct{}
downstream adsStream
upstream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient
downstreamDeltas discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer
upstreamDeltas discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesClient
}
// sendRequest is a small wrapper around sending to con.requestsChan. This ensures that we do not
// block forever on
func (con *ProxyConnection) sendRequest(req *discovery.DiscoveryRequest) {
select {
case con.requestsChan <- req:
case <-con.stopChan:
}
}
type adsStream interface {
Send(*discovery.DiscoveryResponse) error
Recv() (*discovery.DiscoveryRequest, error)
Context() context.Context
}
// Every time envoy makes a fresh connection to the agent, we reestablish a new connection to the upstream xds
// This ensures that a new connection between istiod and agent doesn't end up consuming pending messages from envoy
// as the new connection may not go to the same istiod. Vice versa case also applies.
func (p *XdsProxy) StreamAggregatedResources(downstream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
proxyLog.Debugf("accepted XDS connection from Envoy, forwarding to upstream XDS server")
return p.handleStream(downstream)
}
func (p *XdsProxy) handleStream(downstream adsStream) error {
con := &ProxyConnection{
conID: connectionNumber.Inc(),
upstreamError: make(chan error, 2), // can be produced by recv and send
downstreamError: make(chan error, 2), // can be produced by recv and send
requestsChan: make(chan *discovery.DiscoveryRequest, 10),
responsesChan: make(chan *discovery.DiscoveryResponse, 10),
stopChan: make(chan struct{}),
downstream: downstream,
}
p.RegisterStream(con)
defer p.UnregisterStream(con)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
upstreamConn, err := p.buildUpstreamConn(ctx)
if err != nil {
proxyLog.Errorf("failed to connect to upstream %s: %v", p.istiodAddress, err)
metrics.IstiodConnectionFailures.Increment()
return err
}
defer upstreamConn.Close()
xds := discovery.NewAggregatedDiscoveryServiceClient(upstreamConn)
ctx = metadata.AppendToOutgoingContext(context.Background(), "ClusterID", p.clusterID)
for k, v := range p.xdsHeaders {
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
}
// We must propagate upstream termination to Envoy. This ensures that we resume the full XDS sequence on new connection
return p.HandleUpstream(ctx, con, xds)
}
func (p *XdsProxy) buildUpstreamConn(ctx context.Context) (*grpc.ClientConn, error) {
p.optsMutex.RLock()
opts := make([]grpc.DialOption, 0, len(p.istiodDialOptions))
opts = append(opts, p.istiodDialOptions...)
p.optsMutex.RUnlock()
return grpc.DialContext(ctx, p.istiodAddress, opts...)
}
func (p *XdsProxy) HandleUpstream(ctx context.Context, con *ProxyConnection, xds discovery.AggregatedDiscoveryServiceClient) error {
upstream, err := xds.StreamAggregatedResources(ctx,
grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize))
if err != nil {
// Envoy logs errors again, so no need to log beyond debug level
proxyLog.Debugf("failed to create upstream grpc client: %v", err)
// Increase metric when xds connection error, for example: forgot to restart ingressgateway or sidecar after changing root CA.
metrics.IstiodConnectionErrors.Increment()
return err
}
proxyLog.Infof("connected to upstream XDS server: %s", p.istiodAddress)
defer proxyLog.Debugf("disconnected from XDS server: %s", p.istiodAddress)
con.upstream = upstream
// Handle upstream xds recv
go func() {
for {
// from istiod
resp, err := con.upstream.Recv()
if err != nil {
select {
case con.upstreamError <- err:
case <-con.stopChan:
}
return
}
select {
case con.responsesChan <- resp:
case <-con.stopChan:
}
}
}()
go p.handleUpstreamRequest(con)
go p.handleUpstreamResponse(con)
for {
select {
case err := <-con.upstreamError:
// error from upstream Istiod.
if istiogrpc.IsExpectedGRPCError(err) {
proxyLog.Debugf("upstream [%d] terminated with status %v", con.conID, err)
metrics.IstiodConnectionCancellations.Increment()
} else {
proxyLog.Warnf("upstream [%d] terminated with unexpected error %v", con.conID, err)
metrics.IstiodConnectionErrors.Increment()
}
return err
case err := <-con.downstreamError:
// error from downstream Envoy.
if istiogrpc.IsExpectedGRPCError(err) {
proxyLog.Debugf("downstream [%d] terminated with status %v", con.conID, err)
metrics.EnvoyConnectionCancellations.Increment()
} else {
proxyLog.Warnf("downstream [%d] terminated with unexpected error %v", con.conID, err)
metrics.EnvoyConnectionErrors.Increment()
}
// On downstream error, we will return. This propagates the error to downstream envoy which will trigger reconnect
return err
case <-con.stopChan:
proxyLog.Debugf("stream stopped")
return nil
}
}
}
func (p *XdsProxy) handleUpstreamRequest(con *ProxyConnection) {
initialRequestsSent := atomic.NewBool(false)
go func() {
for {
// recv xds requests from envoy
req, err := con.downstream.Recv()
if err != nil {
select {
case con.downstreamError <- err:
case <-con.stopChan:
}
return
}
// forward to istiod
con.sendRequest(req)
if !initialRequestsSent.Load() && req.TypeUrl == v3.ListenerType {
// fire off an initial NDS request
if _, f := p.handlers[v3.NameTableType]; f {
con.sendRequest(&discovery.DiscoveryRequest{
TypeUrl: v3.NameTableType,
})
}
// fire off an initial PCDS request
if _, f := p.handlers[v3.ProxyConfigType]; f {
con.sendRequest(&discovery.DiscoveryRequest{
TypeUrl: v3.ProxyConfigType,
})
}
// set flag before sending the initial request to prevent race.
initialRequestsSent.Store(true)
// Fire of a configured initial request, if there is one
p.connectedMutex.RLock()
initialRequest := p.initialRequest
if initialRequest != nil {
con.sendRequest(initialRequest)
}
p.connectedMutex.RUnlock()
}
}
}()
defer con.upstream.CloseSend() // nolint
for {
select {
case req := <-con.requestsChan:
if req.TypeUrl == v3.HealthInfoType && !initialRequestsSent.Load() {
// only send healthcheck probe after LDS request has been sent
continue
}
proxyLog.Debugf("request for type url %s", req.TypeUrl)
metrics.XdsProxyRequests.Increment()
if req.TypeUrl == v3.ExtensionConfigurationType {
if req.VersionInfo != "" {
p.ecdsLastAckVersion.Store(req.VersionInfo)
}
p.ecdsLastNonce.Store(req.ResponseNonce)
}
if err := sendUpstream(con.upstream, req); err != nil {
proxyLog.Errorf("upstream [%d] send error for type url %s: %v", con.conID, req.TypeUrl, err)
con.upstreamError <- err
return
}
case <-con.stopChan:
return
}
}
}
func (p *XdsProxy) handleUpstreamResponse(con *ProxyConnection) {
forwardEnvoyCh := make(chan *discovery.DiscoveryResponse, 1)
for {
select {
case resp := <-con.responsesChan:
// TODO: separate upstream response handling from requests sending, which are both time costly
proxyLog.Debugf("response for type url %s", resp.TypeUrl)
metrics.XdsProxyResponses.Increment()
if h, f := p.handlers[resp.TypeUrl]; f {
if len(resp.Resources) == 0 {
// Empty response, nothing to do
// This assumes internal types are always singleton
break
}
err := h(resp.Resources[0])
var errorResp *google_rpc.Status
if err != nil {
errorResp = &google_rpc.Status{
Code: int32(codes.Internal),
Message: err.Error(),
}
}
// Send ACK/NACK
con.sendRequest(&discovery.DiscoveryRequest{
VersionInfo: resp.VersionInfo,
TypeUrl: resp.TypeUrl,
ResponseNonce: resp.Nonce,
ErrorDetail: errorResp,
})
continue
}
switch resp.TypeUrl {
case v3.ExtensionConfigurationType:
if features.WasmRemoteLoadConversion {
// If Wasm remote load conversion feature is enabled, rewrite and send.
go p.rewriteAndForward(con, resp, func(resp *discovery.DiscoveryResponse) {
// Forward the response using the thread of `handleUpstreamResponse`
// to prevent concurrent access to forwardToEnvoy
select {
case forwardEnvoyCh <- resp:
case <-con.stopChan:
}
})
} else {
// Otherwise, forward ECDS resource update directly to Envoy.
forwardToEnvoy(con, resp)
}
default:
if strings.HasPrefix(resp.TypeUrl, "istio.io/debug") {
p.forwardToTap(resp)
} else {
forwardToEnvoy(con, resp)
}
}
case resp := <-forwardEnvoyCh:
forwardToEnvoy(con, resp)
case <-con.stopChan:
return
}
}
}
func (p *XdsProxy) rewriteAndForward(con *ProxyConnection, resp *discovery.DiscoveryResponse, forward func(resp *discovery.DiscoveryResponse)) {
sendNack := wasm.MaybeConvertWasmExtensionConfig(resp.Resources, p.wasmCache)
if sendNack {
proxyLog.Debugf("sending NACK for ECDS resources %+v", resp.Resources)
con.sendRequest(&discovery.DiscoveryRequest{
VersionInfo: p.ecdsLastAckVersion.Load(),
TypeUrl: v3.ExtensionConfigurationType,
ResponseNonce: resp.Nonce,
ErrorDetail: &google_rpc.Status{
// TODO(bianpengyuan): make error message more informative.
Message: "failed to fetch wasm module",
},
})
return
}
proxyLog.Debugf("forward ECDS resources %+v", resp.Resources)
forward(resp)
}
func (p *XdsProxy) forwardToTap(resp *discovery.DiscoveryResponse) {
select {
case p.tapResponseChannel <- resp:
default:
log.Infof("tap response %q arrived too late; discarding", resp.TypeUrl)
}
}
func forwardToEnvoy(con *ProxyConnection, resp *discovery.DiscoveryResponse) {
if !v3.IsEnvoyType(resp.TypeUrl) {
proxyLog.Errorf("Skipping forwarding type url %s to Envoy as is not a valid Envoy type", resp.TypeUrl)
return
}
if err := sendDownstream(con.downstream, resp); err != nil {
select {
case con.downstreamError <- err:
// we cannot return partial error and hope to restart just the downstream
// as we are blindly proxying req/responses. For now, the best course of action
// is to terminate upstream connection as well and restart afresh.
proxyLog.Errorf("downstream [%d] send error: %v", con.conID, err)
default:
// Do not block on downstream error channel push, this could happen when forward
// is triggered from a separated goroutine (e.g. ECDS processing go routine) while
// downstream connection has already been teared down and no receiver is available
// for downstream error channel.
proxyLog.Debugf("downstream [%d] error channel full, but get downstream send error: %v", con.conID, err)
}
return
}
}
func (p *XdsProxy) close() {
close(p.stopChan)
p.wasmCache.Cleanup()
if p.httpTapServer != nil {
_ = p.httpTapServer.Close()
}
if p.downstreamGrpcServer != nil {
p.downstreamGrpcServer.Stop()
}
if p.downstreamListener != nil {
_ = p.downstreamListener.Close()
}
}
func (p *XdsProxy) initDownstreamServer() error {
l, err := uds.NewListener(p.xdsUdsPath)
if err != nil {
return err
}
// TODO: Expose keepalive options to agent cmd line flags.
opts := p.downstreamGrpcOptions
opts = append(opts, istiogrpc.ServerOptions(istiokeepalive.DefaultOption())...)
grpcs := grpc.NewServer(opts...)
discovery.RegisterAggregatedDiscoveryServiceServer(grpcs, p)
reflection.Register(grpcs)
p.downstreamGrpcServer = grpcs
p.downstreamListener = l
return nil
}
func (p *XdsProxy) InitIstiodDialOptions(agent *Agent) error {
opts, err := p.buildUpstreamClientDialOpts(agent)
if err != nil {
return err
}
p.optsMutex.Lock()
p.istiodDialOptions = opts
p.optsMutex.Unlock()
return nil
}
func (p *XdsProxy) buildUpstreamClientDialOpts(sa *Agent) ([]grpc.DialOption, error) {
tlsOpts, err := p.getTLSDialOption(sa)
if err != nil {
return nil, fmt.Errorf("failed to build TLS dial option to talk to upstream: %v", err)
}
keepaliveOption := grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second,
Timeout: 10 * time.Second,
})
initialWindowSizeOption := grpc.WithInitialWindowSize(int32(defaultInitialWindowSize))
initialConnWindowSizeOption := grpc.WithInitialConnWindowSize(int32(defaultInitialConnWindowSize))
msgSizeOption := grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize))
// Make sure the dial is blocking as we don't want any other operation to resume until the
// connection to upstream has been made.
dialOptions := []grpc.DialOption{
tlsOpts,
keepaliveOption, initialWindowSizeOption, initialConnWindowSizeOption, msgSizeOption,
}
dialOptions = append(dialOptions, grpc.WithPerRPCCredentials(caclient.NewXDSTokenProvider(sa.secOpts)))
return dialOptions, nil
}
// Returns the TLS option to use when talking to Istiod
// If provisioned cert is set, it will return a mTLS related config
// Else it will return a one-way TLS related config with the assumption
// that the consumer code will use tokens to authenticate the upstream.
func (p *XdsProxy) getTLSDialOption(agent *Agent) (grpc.DialOption, error) {
if agent.proxyConfig.ControlPlaneAuthPolicy == meshconfig.AuthenticationPolicy_NONE {
return grpc.WithTransportCredentials(insecure.NewCredentials()), nil
}
rootCert, err := p.getRootCertificate(agent)
if err != nil {
return nil, err
}
config := tls.Config{
GetClientCertificate: func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
var certificate tls.Certificate
key, cert := agent.GetKeyCertsForXDS()
if key != "" && cert != "" {
// Load the certificate from disk
certificate, err = tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, err
}
}
return &certificate, nil
},
RootCAs: rootCert,
}
// strip the port from the address
parts := strings.Split(agent.proxyConfig.DiscoveryAddress, ":")
config.ServerName = parts[0]
// For debugging on localhost (with port forward)
// This matches the logic for the CA; this code should eventually be shared
if strings.Contains(config.ServerName, "localhost") {
config.ServerName = "istiod.dubbo-system.svc"
}
if p.istiodSAN != "" {
config.ServerName = p.istiodSAN
}
// TODO: if istiodSAN starts with spiffe://, use custom validation.
config.MinVersion = tls.VersionTLS12
transportCreds := credentials.NewTLS(&config)
return grpc.WithTransportCredentials(transportCreds), nil
}
func (p *XdsProxy) getRootCertificate(agent *Agent) (*x509.CertPool, error) {
var certPool *x509.CertPool
var rootCert []byte
xdsCACertPath, err := agent.FindRootCAForXDS()
if err != nil {
return nil, fmt.Errorf("failed to find root CA cert for XDS: %v", err)
}
if xdsCACertPath != "" {
rootCert, err = os.ReadFile(xdsCACertPath)
if err != nil {
return nil, err
}
certPool = x509.NewCertPool()
ok := certPool.AppendCertsFromPEM(rootCert)
if !ok {
return nil, fmt.Errorf("failed to create TLS dial option with root certificates")
}
} else {
certPool, err = x509.SystemCertPool()
if err != nil {
return nil, err
}
}
return certPool, nil
}
// sendUpstream sends discovery request.
func sendUpstream(upstream xds.DiscoveryClient, request *discovery.DiscoveryRequest) error {
return istiogrpc.Send(upstream.Context(), func() error { return upstream.Send(request) })
}
// sendDownstream sends discovery response.
func sendDownstream(downstream adsStream, response *discovery.DiscoveryResponse) error {
return istiogrpc.Send(downstream.Context(), func() error { return downstream.Send(response) })
}
// tapRequest() sends "req" to Istiod, and returns a matching response, or `nil` on timeout.
// Requests are serialized -- only one may be in-flight at a time.
func (p *XdsProxy) tapRequest(req *discovery.DiscoveryRequest, timeout time.Duration) (*discovery.DiscoveryResponse, error) {
if p.connected == nil {
return nil, fmt.Errorf("proxy not connected to Istiod")
}
// Only allow one tap request at a time
p.tapMutex.Lock()
defer p.tapMutex.Unlock()
// Send to Istiod
p.connected.sendRequest(req)
// Wait for expected response or timeout
for {
select {
case res := <-p.tapResponseChannel:
if res.TypeUrl == req.TypeUrl {
return res, nil
}
case <-time.After(timeout):
return nil, nil
}
}
}
func (p *XdsProxy) makeTapHandler() func(w http.ResponseWriter, req *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
qp, err := url.ParseQuery(req.URL.RawQuery)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "%v\n", err)
return
}
typeURL := fmt.Sprintf("istio.io%s", req.URL.Path)
dr := discovery.DiscoveryRequest{
TypeUrl: typeURL,
}
resourceName := qp.Get("resourceName")
if resourceName != "" {
dr.ResourceNames = []string{resourceName}
}
response, err := p.tapRequest(&dr, 5*time.Second)
if err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
fmt.Fprintf(w, "%v\n", err)
return
}
if response == nil {
log.Infof("timed out waiting for Istiod to respond to %q", typeURL)
w.WriteHeader(http.StatusGatewayTimeout)
return
}
// Try to unmarshal Istiod's response using protojson (needed for Envoy protobufs)
w.Header().Add("Content-Type", "application/json")
b, err := protomarshal.MarshalIndent(response, " ")
if err == nil {
_, err = w.Write(b)
if err != nil {
log.Infof("fail to write debug response: %v", err)
}
return
}
// Failed as protobuf. Try as regular JSON
proxyLog.Warnf("could not marshal istiod response as pb: %v", err)
j, err := json.Marshal(response)
if err != nil {
// Couldn't unmarshal at all
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "%v\n", err)
return
}
_, err = w.Write(j)
if err != nil {
log.Infof("fail to write debug response: %v", err)
return
}
}
}
// initDebugInterface() listens on localhost:${PORT} for path /debug/...
// forwards the paths to Istiod as xDS requests
// waits for response from Istiod, sends it as JSON
func (p *XdsProxy) initDebugInterface(port int) error {
p.tapResponseChannel = make(chan *discovery.DiscoveryResponse)
httpMux := http.NewServeMux()
handler := p.makeTapHandler()
httpMux.HandleFunc("/debug/", handler)
httpMux.HandleFunc("/debug", handler) // For 1.10 Istiod which uses istio.io/debug
p.httpTapServer = &http.Server{
Addr: fmt.Sprintf("localhost:%d", port),
Handler: httpMux,
IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout
ReadTimeout: 30 * time.Second,
}
// create HTTP listener
listener, err := net.Listen("tcp", p.httpTapServer.Addr)
if err != nil {
return err
}
go func() {
log.Infof("starting Http service at %s", listener.Addr())
if err := p.httpTapServer.Serve(listener); err != nil {
log.Errorf("error serving tap http server: %v", err)
}
}()
return nil
}