blob: 12ec6c7e5b20a63d88a03984e162148c432fc483 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package adsc
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"math"
"net"
"os"
"sort"
"strings"
"sync"
"time"
)
import (
"github.com/cenkalti/backoff/v4"
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/conversion"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"
any "google.golang.org/protobuf/types/known/anypb"
pstruct "google.golang.org/protobuf/types/known/structpb"
mcp "istio.io/api/mcp/v1alpha1"
"istio.io/api/mesh/v1alpha1"
"istio.io/pkg/log"
)
import (
mem "github.com/apache/dubbo-go-pixiu/pilot/pkg/config/memory"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/util"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/memory"
v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/config/constants"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/collections"
"github.com/apache/dubbo-go-pixiu/pkg/security"
"github.com/apache/dubbo-go-pixiu/pkg/util/protomarshal"
)
const (
defaultClientMaxReceiveMessageSize = math.MaxInt32
defaultInitialConnWindowSize = 1024 * 1024 // default gRPC InitialWindowSize
defaultInitialWindowSize = 1024 * 1024 // default gRPC ConnWindowSize
)
// Config for the ADS connection.
type Config struct {
// Namespace defaults to 'default'
Namespace string
// Workload defaults to 'test'
Workload string
// Revision for this control plane instance. We will only read configs that match this revision.
Revision string
// Meta includes additional metadata for the node
Meta *pstruct.Struct
Locality *core.Locality
// NodeType defaults to sidecar. "ingress" and "router" are also supported.
NodeType string
// IP is currently the primary key used to locate inbound configs. It is sent by client,
// must match a known endpoint IP. Tests can use a ServiceEntry to register fake IPs.
IP string
// CertDir is the directory where mTLS certs are configured.
// If CertDir and Secret are empty, an insecure connection will be used.
// TODO: implement SecretManager for cert dir
CertDir string
// Secrets is the interface used for getting keys and rootCA.
SecretManager security.SecretManager
// For getting the certificate, using same code as SDS server.
// Either the JWTPath or the certs must be present.
JWTPath string
// XDSSAN is the expected SAN of the XDS server. If not set, the ProxyConfig.DiscoveryAddress is used.
XDSSAN string
// XDSRootCAFile explicitly set the root CA to be used for the XDS connection.
// Mirrors Envoy file.
XDSRootCAFile string
// RootCert contains the XDS root certificate. Used mainly for tests, apps will normally use
// XDSRootCAFile
RootCert []byte
// InsecureSkipVerify skips client verification the server's certificate chain and host name.
InsecureSkipVerify bool
// InitialDiscoveryRequests is a list of resources to watch at first, represented as URLs (for new XDS resource naming)
// or type URLs.
InitialDiscoveryRequests []*discovery.DiscoveryRequest
// BackoffPolicy determines the reconnect policy. Based on MCP client.
BackoffPolicy backoff.BackOff
// ResponseHandler will be called on each DiscoveryResponse.
// TODO: mirror Generator, allow adding handler per type
ResponseHandler ResponseHandler
GrpcOpts []grpc.DialOption
}
func DefaultGrpcDialOptions() []grpc.DialOption {
return []grpc.DialOption{
// TODO(SpecialYang) maybe need to make it configurable.
grpc.WithInitialWindowSize(int32(defaultInitialWindowSize)),
grpc.WithInitialConnWindowSize(int32(defaultInitialConnWindowSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize)),
}
}
// ADSC implements a basic client for ADS, for use in stress tests and tools
// or libraries that need to connect to Istio pilot or other ADS servers.
type ADSC struct {
// Stream is the GRPC connection stream, allowing direct GRPC send operations.
// Set after Dial is called.
stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient
// xds client used to create a stream
client discovery.AggregatedDiscoveryServiceClient
conn *grpc.ClientConn
// Indicates if the ADSC client is closed
closed bool
// NodeID is the node identity sent to Pilot.
nodeID string
url string
watchTime time.Time
// InitialLoad tracks the time to receive the initial configuration.
InitialLoad time.Duration
// httpListeners contains received listeners with a http_connection_manager filter.
httpListeners map[string]*listener.Listener
// tcpListeners contains all listeners of type TCP (not-HTTP)
tcpListeners map[string]*listener.Listener
// All received clusters of type eds, keyed by name
edsClusters map[string]*cluster.Cluster
// All received clusters of no-eds type, keyed by name
clusters map[string]*cluster.Cluster
// All received routes, keyed by route name
routes map[string]*route.RouteConfiguration
// All received endpoints, keyed by cluster name
eds map[string]*endpoint.ClusterLoadAssignment
// Metadata has the node metadata to send to pilot.
// If nil, the defaults will be used.
Metadata *pstruct.Struct
// Updates includes the type of the last update received from the server.
Updates chan string
errChan chan error
XDSUpdates chan *discovery.DiscoveryResponse
VersionInfo map[string]string
// Last received message, by type
Received map[string]*discovery.DiscoveryResponse
mutex sync.RWMutex
Mesh *v1alpha1.MeshConfig
// Retrieved configurations can be stored using the common istio model interface.
Store model.ConfigStore
// Retrieved endpoints can be stored in the memory registry. This is used for CDS and EDS responses.
Registry *memory.ServiceDiscovery
// LocalCacheDir is set to a base name used to save fetched resources.
// If set, each update will be saved.
// TODO: also load at startup - so we can support warm up in init-container, and survive
// restarts.
LocalCacheDir string
// RecvWg is for letting goroutines know when the goroutine handling the ADS stream finishes.
RecvWg sync.WaitGroup
cfg *Config
// sendNodeMeta is set to true if the connection is new - and we need to send node meta.,
sendNodeMeta bool
sync map[string]time.Time
Locality *core.Locality
}
type ResponseHandler interface {
HandleResponse(con *ADSC, response *discovery.DiscoveryResponse)
}
// jsonMarshalProtoWithName wraps a proto.Message with name so it can be marshaled with the standard encoding/json library
type jsonMarshalProtoWithName struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Message proto.Message
}
func (p jsonMarshalProtoWithName) MarshalJSON() ([]byte, error) {
strSer, serr := protomarshal.ToJSONWithIndent(p.Message, " ")
if serr != nil {
adscLog.Warnf("Error for marshaling [%s]: %v", p.Name, serr)
return []byte(""), serr
}
serialItem := []byte("{\"" + p.Name + "\":" + strSer + "}")
return serialItem, nil
}
var adscLog = log.RegisterScope("adsc", "adsc debugging", 0)
func NewWithBackoffPolicy(discoveryAddr string, opts *Config, backoffPolicy backoff.BackOff) (*ADSC, error) {
adsc, err := New(discoveryAddr, opts)
if err != nil {
return nil, err
}
adsc.cfg.BackoffPolicy = backoffPolicy
return adsc, err
}
// New creates a new ADSC, maintaining a connection to an XDS server.
// Will:
// - get certificate using the Secret provider, if CertRequired
// - connect to the XDS server specified in ProxyConfig
// - send initial request for watched resources
// - wait for response from XDS server
// - on success, start a background thread to maintain the connection, with exp. backoff.
func New(discoveryAddr string, opts *Config) (*ADSC, error) {
if opts == nil {
opts = &Config{}
}
// We want to recreate stream
if opts.BackoffPolicy == nil {
opts.BackoffPolicy = backoff.NewExponentialBackOff()
}
adsc := &ADSC{
Updates: make(chan string, 100),
XDSUpdates: make(chan *discovery.DiscoveryResponse, 100),
VersionInfo: map[string]string{},
url: discoveryAddr,
Received: map[string]*discovery.DiscoveryResponse{},
RecvWg: sync.WaitGroup{},
cfg: opts,
sync: map[string]time.Time{},
errChan: make(chan error, 10),
}
if opts.Namespace == "" {
opts.Namespace = "default"
}
if opts.NodeType == "" {
opts.NodeType = "sidecar"
}
if opts.IP == "" {
opts.IP = getPrivateIPIfAvailable().String()
}
if opts.Workload == "" {
opts.Workload = "test-1"
}
adsc.Metadata = opts.Meta
adsc.Locality = opts.Locality
adsc.nodeID = fmt.Sprintf("%s~%s~%s.%s~%s.svc.%s", opts.NodeType, opts.IP,
opts.Workload, opts.Namespace, opts.Namespace, constants.DefaultKubernetesDomain)
if err := adsc.Dial(); err != nil {
return nil, err
}
return adsc, nil
}
// Dial connects to a ADS server, with optional MTLS authentication if a cert dir is specified.
func (a *ADSC) Dial() error {
opts := a.cfg
defaultGrpcDialOptions := DefaultGrpcDialOptions()
var grpcDialOptions []grpc.DialOption
grpcDialOptions = append(grpcDialOptions, defaultGrpcDialOptions...)
grpcDialOptions = append(grpcDialOptions, opts.GrpcOpts...)
var err error
// If we need MTLS - CertDir or Secrets provider is set.
if len(opts.CertDir) > 0 || opts.SecretManager != nil {
tlsCfg, err := a.tlsConfig()
if err != nil {
return err
}
creds := credentials.NewTLS(tlsCfg)
grpcDialOptions = append(grpcDialOptions, grpc.WithTransportCredentials(creds))
}
if len(grpcDialOptions) == len(defaultGrpcDialOptions) {
// Only disable transport security if the user didn't supply custom dial options
grpcDialOptions = append(grpcDialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
a.conn, err = grpc.Dial(a.url, grpcDialOptions...)
if err != nil {
return err
}
return nil
}
// Returns a private IP address, or unspecified IP (0.0.0.0) if no IP is available
func getPrivateIPIfAvailable() net.IP {
addrs, _ := net.InterfaceAddrs()
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
default:
continue
}
if !ip.IsLoopback() {
return ip
}
}
return net.IPv4zero
}
func (a *ADSC) tlsConfig() (*tls.Config, error) {
var clientCerts []tls.Certificate
var serverCABytes []byte
var err error
getClientCertificate := getClientCertFn(a.cfg)
// Load the root CAs
if a.cfg.RootCert != nil {
serverCABytes = a.cfg.RootCert
} else if a.cfg.XDSRootCAFile != "" {
serverCABytes, err = os.ReadFile(a.cfg.XDSRootCAFile)
} else if a.cfg.SecretManager != nil {
// This is a bit crazy - we could just use the file
rootCA, err := a.cfg.SecretManager.GenerateSecret(security.RootCertReqResourceName)
if err != nil {
return nil, err
}
serverCABytes = rootCA.RootCert
} else if a.cfg.CertDir != "" {
serverCABytes, err = os.ReadFile(a.cfg.CertDir + "/root-cert.pem")
if err != nil {
return nil, err
}
}
serverCAs := x509.NewCertPool()
if ok := serverCAs.AppendCertsFromPEM(serverCABytes); !ok {
return nil, err
}
shost, _, _ := net.SplitHostPort(a.url)
if a.cfg.XDSSAN != "" {
shost = a.cfg.XDSSAN
}
return &tls.Config{
GetClientCertificate: getClientCertificate,
Certificates: clientCerts,
RootCAs: serverCAs,
ServerName: shost,
InsecureSkipVerify: a.cfg.InsecureSkipVerify,
}, nil
}
// Close the stream.
func (a *ADSC) Close() {
a.mutex.Lock()
_ = a.conn.Close()
a.closed = true
a.mutex.Unlock()
}
// Run will create a new stream using the existing grpc client connection and send the initial xds requests.
// And then it will run a go routine receiving and handling xds response.
// Note: it is non blocking
func (a *ADSC) Run() error {
var err error
a.client = discovery.NewAggregatedDiscoveryServiceClient(a.conn)
a.stream, err = a.client.StreamAggregatedResources(context.Background())
if err != nil {
return err
}
a.sendNodeMeta = true
a.InitialLoad = 0
// Send the initial requests
for _, r := range a.cfg.InitialDiscoveryRequests {
if r.TypeUrl == v3.ClusterType {
a.watchTime = time.Now()
}
_ = a.Send(r)
}
// by default, we assume 1 goroutine decrements the waitgroup (go a.handleRecv()).
// for synchronizing when the goroutine finishes reading from the gRPC stream.
a.RecvWg.Add(1)
go a.handleRecv()
return nil
}
// HasSynced returns true if MCP configs have synced
func (a *ADSC) HasSynced() bool {
if a.cfg == nil || len(a.cfg.InitialDiscoveryRequests) == 0 {
return true
}
a.mutex.RLock()
defer a.mutex.RUnlock()
for _, req := range a.cfg.InitialDiscoveryRequests {
if strings.Count(req.TypeUrl, "/") != 3 {
continue
}
if _, ok := a.sync[req.TypeUrl]; !ok {
return false
}
}
return true
}
// reconnect will create a new stream
func (a *ADSC) reconnect() {
a.mutex.RLock()
if a.closed {
a.mutex.RUnlock()
return
}
a.mutex.RUnlock()
err := a.Run()
if err == nil {
a.cfg.BackoffPolicy.Reset()
} else {
time.AfterFunc(a.cfg.BackoffPolicy.NextBackOff(), a.reconnect)
}
}
func (a *ADSC) handleRecv() {
for {
var err error
msg, err := a.stream.Recv()
if err != nil {
a.RecvWg.Done()
adscLog.Infof("Connection closed for node %v with err: %v", a.nodeID, err)
select {
case a.errChan <- err:
default:
}
// if 'reconnect' enabled - schedule a new Run
if a.cfg.BackoffPolicy != nil {
time.AfterFunc(a.cfg.BackoffPolicy.NextBackOff(), a.reconnect)
} else {
a.Close()
a.WaitClear()
a.Updates <- ""
a.XDSUpdates <- nil
close(a.errChan)
}
return
}
// Group-value-kind - used for high level api generator.
gvk := strings.SplitN(msg.TypeUrl, "/", 3)
adscLog.Info("Received ", a.url, " type ", msg.TypeUrl,
" cnt=", len(msg.Resources), " nonce=", msg.Nonce)
if a.cfg.ResponseHandler != nil {
a.cfg.ResponseHandler.HandleResponse(a, msg)
}
if msg.TypeUrl == collections.IstioMeshV1Alpha1MeshConfig.Resource().GroupVersionKind().String() &&
len(msg.Resources) > 0 {
rsc := msg.Resources[0]
m := &v1alpha1.MeshConfig{}
err = proto.Unmarshal(rsc.Value, m)
if err != nil {
adscLog.Warn("Failed to unmarshal mesh config", err)
}
a.Mesh = m
if a.LocalCacheDir != "" {
strResponse, err := protomarshal.ToJSONWithIndent(m, " ")
if err != nil {
continue
}
err = os.WriteFile(a.LocalCacheDir+"_mesh.json", []byte(strResponse), 0o644)
if err != nil {
continue
}
}
continue
}
// Process the resources.
a.VersionInfo[msg.TypeUrl] = msg.VersionInfo
switch msg.TypeUrl {
case v3.ListenerType:
listeners := make([]*listener.Listener, 0, len(msg.Resources))
for _, rsc := range msg.Resources {
valBytes := rsc.Value
ll := &listener.Listener{}
_ = proto.Unmarshal(valBytes, ll)
listeners = append(listeners, ll)
}
a.handleLDS(listeners)
case v3.ClusterType:
clusters := make([]*cluster.Cluster, 0, len(msg.Resources))
for _, rsc := range msg.Resources {
valBytes := rsc.Value
cl := &cluster.Cluster{}
_ = proto.Unmarshal(valBytes, cl)
clusters = append(clusters, cl)
}
a.handleCDS(clusters)
case v3.EndpointType:
eds := make([]*endpoint.ClusterLoadAssignment, 0, len(msg.Resources))
for _, rsc := range msg.Resources {
valBytes := rsc.Value
el := &endpoint.ClusterLoadAssignment{}
_ = proto.Unmarshal(valBytes, el)
eds = append(eds, el)
}
a.handleEDS(eds)
case v3.RouteType:
routes := make([]*route.RouteConfiguration, 0, len(msg.Resources))
for _, rsc := range msg.Resources {
valBytes := rsc.Value
rl := &route.RouteConfiguration{}
_ = proto.Unmarshal(valBytes, rl)
routes = append(routes, rl)
}
a.handleRDS(routes)
default:
a.handleMCP(gvk, msg.Resources)
}
// If we got no resource - still save to the store with empty name/namespace, to notify sync
// This scheme also allows us to chunk large responses !
// TODO: add hook to inject nacks
a.mutex.Lock()
if len(gvk) == 3 {
gt := config.GroupVersionKind{Group: gvk[0], Version: gvk[1], Kind: gvk[2]}
if _, exist := a.sync[gt.String()]; !exist {
a.sync[gt.String()] = time.Now()
}
}
a.Received[msg.TypeUrl] = msg
a.ack(msg)
a.mutex.Unlock()
select {
case a.XDSUpdates <- msg:
default:
}
}
}
func (a *ADSC) mcpToPilot(m *mcp.Resource) (*config.Config, error) {
if m == nil || m.Metadata == nil {
return &config.Config{}, nil
}
c := &config.Config{
Meta: config.Meta{
ResourceVersion: m.Metadata.Version,
Labels: m.Metadata.Labels,
Annotations: m.Metadata.Annotations,
},
}
if !config.ObjectInRevision(c, a.cfg.Revision) { // In case upstream does not support rev in node meta.
return nil, nil
}
if c.Meta.Annotations == nil {
c.Meta.Annotations = make(map[string]string)
}
nsn := strings.Split(m.Metadata.Name, "/")
if len(nsn) != 2 {
return nil, fmt.Errorf("invalid name %s", m.Metadata.Name)
}
c.Namespace = nsn[0]
c.Name = nsn[1]
var err error
c.CreationTimestamp = m.Metadata.CreateTime.AsTime()
pb, err := m.Body.UnmarshalNew()
if err != nil {
return nil, err
}
c.Spec = pb
return c, nil
}
// nolint: staticcheck
func (a *ADSC) handleLDS(ll []*listener.Listener) {
lh := map[string]*listener.Listener{}
lt := map[string]*listener.Listener{}
routes := []string{}
ldsSize := 0
for _, l := range ll {
ldsSize += proto.Size(l)
// The last filter is the actual destination for inbound listener
if l.ApiListener != nil {
// This is an API Listener
// TODO: extract VIP and RDS or cluster
continue
}
fc := l.FilterChains[len(l.FilterChains)-1]
// Find the terminal filter
filter := fc.Filters[len(fc.Filters)-1]
// The actual destination will be the next to the last if the last filter is a passthrough filter
if fc.GetName() == util.PassthroughFilterChain {
fc = l.FilterChains[len(l.FilterChains)-2]
filter = fc.Filters[len(fc.Filters)-1]
}
switch filter.Name {
case wellknown.TCPProxy:
lt[l.Name] = l
config, _ := conversion.MessageToStruct(filter.GetTypedConfig())
c := config.Fields["cluster"].GetStringValue()
adscLog.Debugf("TCP: %s -> %s", l.Name, c)
case wellknown.HTTPConnectionManager:
lh[l.Name] = l
// Getting from config is too painful..
port := l.Address.GetSocketAddress().GetPortValue()
if port == 15002 {
routes = append(routes, "http_proxy")
} else {
routes = append(routes, fmt.Sprintf("%d", port))
}
case wellknown.MongoProxy:
// ignore for now
case wellknown.RedisProxy:
// ignore for now
case wellknown.MySQLProxy:
// ignore for now
default:
adscLog.Infof(protomarshal.ToJSONWithIndent(l, " "))
}
}
adscLog.Infof("LDS: http=%d tcp=%d size=%d", len(lh), len(lt), ldsSize)
if adscLog.DebugEnabled() {
b, _ := json.MarshalIndent(ll, " ", " ")
adscLog.Debugf(string(b))
}
a.mutex.Lock()
defer a.mutex.Unlock()
if len(routes) > 0 {
a.sendRsc(v3.RouteType, routes)
}
a.httpListeners = lh
a.tcpListeners = lt
select {
case a.Updates <- v3.ListenerType:
default:
}
}
// Save will save the json configs to files, using the base directory
func (a *ADSC) Save(base string) error {
a.mutex.Lock()
defer a.mutex.Unlock()
// guarrante the persistence order for each element in tcpListeners
var sortTCPListeners []string
for key := range a.tcpListeners {
sortTCPListeners = append(sortTCPListeners, key)
}
sort.Strings(sortTCPListeners)
arrTCPListenersJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortTCPListeners))
for _, element := range sortTCPListeners {
sliceItem := &jsonMarshalProtoWithName{element, a.tcpListeners[element]}
arrTCPListenersJSONProto = append(arrTCPListenersJSONProto, *sliceItem)
}
byteJSONResponse, err := json.MarshalIndent(arrTCPListenersJSONProto, "", " ")
if err != nil {
adscLog.Warnf("Error for marshaling TCPListeners: %v", err)
}
err = os.WriteFile(base+"_lds_tcp.json", byteJSONResponse, 0o644)
if err != nil {
return err
}
// guarrante the persistence order for each element in httpListeners
var sortHTTPListeners []string
for key := range a.httpListeners {
sortHTTPListeners = append(sortHTTPListeners, key)
}
sort.Strings(sortHTTPListeners)
arrHTTPListenersJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortHTTPListeners))
for _, element := range sortHTTPListeners {
sliceItem := &jsonMarshalProtoWithName{element, a.httpListeners[element]}
arrHTTPListenersJSONProto = append(arrHTTPListenersJSONProto, *sliceItem)
}
byteJSONResponse, err = json.MarshalIndent(arrHTTPListenersJSONProto, "", " ")
if err != nil {
return err
}
err = os.WriteFile(base+"_lds_http.json", byteJSONResponse, 0o644)
if err != nil {
return err
}
// guarrante the persistence order for each element in routes
var sortRoutes []string
for key := range a.routes {
sortRoutes = append(sortRoutes, key)
}
sort.Strings(sortRoutes)
arrRoutesJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortRoutes))
for _, element := range sortRoutes {
sliceItem := &jsonMarshalProtoWithName{element, a.routes[element]}
arrRoutesJSONProto = append(arrRoutesJSONProto, *sliceItem)
}
byteJSONResponse, err = json.MarshalIndent(arrRoutesJSONProto, "", " ")
if err != nil {
return err
}
err = os.WriteFile(base+"_rds.json", byteJSONResponse, 0o644)
if err != nil {
return err
}
// guarrante the persistence order for each element in edsClusters
var sortEdsClusters []string
for key := range a.edsClusters {
sortEdsClusters = append(sortEdsClusters, key)
}
sort.Strings(sortEdsClusters)
arrEdsClustersJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortEdsClusters))
for _, element := range sortEdsClusters {
sliceItem := &jsonMarshalProtoWithName{element, a.edsClusters[element]}
arrEdsClustersJSONProto = append(arrEdsClustersJSONProto, *sliceItem)
}
byteJSONResponse, err = json.MarshalIndent(arrEdsClustersJSONProto, "", " ")
if err != nil {
return err
}
err = os.WriteFile(base+"_ecds.json", byteJSONResponse, 0o644)
if err != nil {
return err
}
// guarrante the persistence order for each element in clusters
var sortClusters []string
for key := range a.clusters {
sortClusters = append(sortClusters, key)
}
sort.Strings(sortClusters)
arrClustersJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortClusters))
for _, element := range sortClusters {
sliceItem := &jsonMarshalProtoWithName{element, a.clusters[element]}
arrClustersJSONProto = append(arrClustersJSONProto, *sliceItem)
}
byteJSONResponse, err = json.MarshalIndent(arrClustersJSONProto, "", " ")
if err != nil {
return err
}
err = os.WriteFile(base+"_cds.json", byteJSONResponse, 0o644)
if err != nil {
return err
}
// guarrante the persistence order for each element in eds
var sortEds []string
for key := range a.eds {
sortEds = append(sortEds, key)
}
sort.Strings(sortEds)
arrEdsJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortEds))
for _, element := range sortEds {
sliceItem := &jsonMarshalProtoWithName{element, a.eds[element]}
arrEdsJSONProto = append(arrEdsJSONProto, *sliceItem)
}
byteJSONResponse, err = json.MarshalIndent(arrEdsJSONProto, "", " ")
if err != nil {
return err
}
err = os.WriteFile(base+"_eds.json", byteJSONResponse, 0o644)
if err != nil {
return err
}
return err
}
func (a *ADSC) handleCDS(ll []*cluster.Cluster) {
cn := make([]string, 0, len(ll))
cdsSize := 0
edscds := map[string]*cluster.Cluster{}
cds := map[string]*cluster.Cluster{}
for _, c := range ll {
cdsSize += proto.Size(c)
switch v := c.ClusterDiscoveryType.(type) {
case *cluster.Cluster_Type:
if v.Type != cluster.Cluster_EDS {
cds[c.Name] = c
continue
}
}
cn = append(cn, c.Name)
edscds[c.Name] = c
}
adscLog.Infof("CDS: %d size=%d", len(cn), cdsSize)
if len(cn) > 0 {
a.sendRsc(v3.EndpointType, cn)
}
if adscLog.DebugEnabled() {
b, _ := json.MarshalIndent(ll, " ", " ")
adscLog.Debugf(string(b))
}
a.mutex.Lock()
defer a.mutex.Unlock()
a.edsClusters = edscds
a.clusters = cds
select {
case a.Updates <- v3.ClusterType:
default:
}
}
func (a *ADSC) node() *core.Node {
n := &core.Node{
Id: a.nodeID,
Locality: a.Locality,
}
if a.Metadata == nil {
n.Metadata = &pstruct.Struct{
Fields: map[string]*pstruct.Value{
"ISTIO_VERSION": {Kind: &pstruct.Value_StringValue{StringValue: "65536.65536.65536"}},
},
}
} else {
n.Metadata = a.Metadata
if a.Metadata.Fields["ISTIO_VERSION"] == nil {
a.Metadata.Fields["ISTIO_VERSION"] = &pstruct.Value{Kind: &pstruct.Value_StringValue{StringValue: "65536.65536.65536"}}
}
}
return n
}
// Raw send of a request.
func (a *ADSC) Send(req *discovery.DiscoveryRequest) error {
if a.sendNodeMeta {
req.Node = a.node()
a.sendNodeMeta = false
}
req.ResponseNonce = time.Now().String()
if adscLog.DebugEnabled() {
strReq, _ := protomarshal.ToJSONWithIndent(req, " ")
adscLog.Debugf("Sending Discovery Request to istiod: %s", strReq)
}
return a.stream.Send(req)
}
func (a *ADSC) handleEDS(eds []*endpoint.ClusterLoadAssignment) {
la := map[string]*endpoint.ClusterLoadAssignment{}
edsSize := 0
ep := 0
for _, cla := range eds {
edsSize += proto.Size(cla)
la[cla.ClusterName] = cla
ep += len(cla.Endpoints)
}
adscLog.Infof("eds: %d size=%d ep=%d", len(eds), edsSize, ep)
if adscLog.DebugEnabled() {
b, _ := json.MarshalIndent(eds, " ", " ")
adscLog.Debugf(string(b))
}
if a.InitialLoad == 0 {
// first load - Envoy loads listeners after endpoints
_ = a.stream.Send(&discovery.DiscoveryRequest{
Node: a.node(),
TypeUrl: v3.ListenerType,
})
}
a.mutex.Lock()
defer a.mutex.Unlock()
a.eds = la
select {
case a.Updates <- v3.EndpointType:
default:
}
}
func (a *ADSC) handleRDS(configurations []*route.RouteConfiguration) {
vh := 0
rcount := 0
size := 0
rds := map[string]*route.RouteConfiguration{}
for _, r := range configurations {
for _, h := range r.VirtualHosts {
vh++
for _, rt := range h.Routes {
rcount++
// Example: match:<prefix:"/" > route:<cluster:"outbound|9154||load-se-154.local" ...
adscLog.Debugf("Handle route %v, path %v, cluster %v", h.Name, rt.Match.PathSpecifier, rt.GetRoute().GetCluster())
}
}
rds[r.Name] = r
size += proto.Size(r)
}
if a.InitialLoad == 0 {
a.InitialLoad = time.Since(a.watchTime)
adscLog.Infof("RDS: %d size=%d vhosts=%d routes=%d time=%d", len(configurations), size, vh, rcount, a.InitialLoad)
} else {
adscLog.Infof("RDS: %d size=%d vhosts=%d routes=%d", len(configurations), size, vh, rcount)
}
if adscLog.DebugEnabled() {
b, _ := json.MarshalIndent(configurations, " ", " ")
adscLog.Debugf(string(b))
}
a.mutex.Lock()
a.routes = rds
a.mutex.Unlock()
select {
case a.Updates <- v3.RouteType:
default:
}
}
// WaitClear will clear the waiting events, so next call to Wait will get
// the next push type.
func (a *ADSC) WaitClear() {
for {
select {
case <-a.Updates:
default:
return
}
}
}
// WaitSingle waits for a single resource, and fails if the rejected type is
// returned. We avoid rejecting all other types to avoid race conditions. For
// example, a test asserting an incremental update of EDS may fail if a previous
// push's RDS response comes in later. Instead, we can reject events coming
// before (ie CDS). The only real alternative is to wait which introduces its own
// issues.
func (a *ADSC) WaitSingle(to time.Duration, want string, reject string) error {
t := time.NewTimer(to)
for {
select {
case t := <-a.Updates:
if t == "" {
return fmt.Errorf("closed")
}
if t != want && t == reject {
return fmt.Errorf("wanted update for %v got %v", want, t)
}
if t == want {
return nil
}
continue
case <-t.C:
return fmt.Errorf("timeout, still waiting for update for %v", want)
}
}
}
// Wait for an updates for all the specified types
// If updates is empty, this will wait for any update
func (a *ADSC) Wait(to time.Duration, updates ...string) ([]string, error) {
t := time.NewTimer(to)
want := map[string]struct{}{}
for _, update := range updates {
want[update] = struct{}{}
}
got := make([]string, 0, len(updates))
for {
select {
case toDelete := <-a.Updates:
if toDelete == "" {
return got, fmt.Errorf("closed")
}
delete(want, toDelete)
got = append(got, toDelete)
if len(want) == 0 {
return got, nil
}
case <-t.C:
return got, fmt.Errorf("timeout, still waiting for updates: %v", want)
}
}
}
// WaitVersion waits for a new or updated for a typeURL.
func (a *ADSC) WaitVersion(to time.Duration, typeURL, lastVersion string) (*discovery.DiscoveryResponse, error) {
t := time.NewTimer(to)
a.mutex.Lock()
ex := a.Received[typeURL]
a.mutex.Unlock()
if ex != nil {
if lastVersion == "" {
return ex, nil
}
if lastVersion != ex.VersionInfo {
return ex, nil
}
}
for {
select {
case t := <-a.XDSUpdates:
if t == nil {
return nil, fmt.Errorf("closed")
}
if t.TypeUrl == typeURL {
return t, nil
}
case <-t.C:
return nil, fmt.Errorf("timeout, still waiting for updates: %v", typeURL)
case err, ok := <-a.errChan:
if ok {
return nil, err
}
return nil, fmt.Errorf("connection closed")
}
}
}
// EndpointsJSON returns the endpoints, formatted as JSON, for debugging.
func (a *ADSC) EndpointsJSON() string {
a.mutex.Lock()
defer a.mutex.Unlock()
out, _ := json.MarshalIndent(a.eds, " ", " ")
return string(out)
}
// Watch will start watching resources, starting with CDS. Based on the CDS response
// it will start watching RDS and LDS.
func (a *ADSC) Watch() {
a.watchTime = time.Now()
_ = a.stream.Send(&discovery.DiscoveryRequest{
Node: a.node(),
TypeUrl: v3.ClusterType,
})
}
func ConfigInitialRequests() []*discovery.DiscoveryRequest {
out := make([]*discovery.DiscoveryRequest, 0, len(collections.Pilot.All())+1)
out = append(out, &discovery.DiscoveryRequest{
TypeUrl: collections.IstioMeshV1Alpha1MeshConfig.Resource().GroupVersionKind().String(),
})
for _, sch := range collections.Pilot.All() {
out = append(out, &discovery.DiscoveryRequest{
TypeUrl: sch.Resource().GroupVersionKind().String(),
})
}
return out
}
// WatchConfig will use the new experimental API watching, similar with MCP.
func (a *ADSC) WatchConfig() {
_ = a.stream.Send(&discovery.DiscoveryRequest{
ResponseNonce: time.Now().String(),
Node: a.node(),
TypeUrl: collections.IstioMeshV1Alpha1MeshConfig.Resource().GroupVersionKind().String(),
})
for _, sch := range collections.Pilot.All() {
_ = a.stream.Send(&discovery.DiscoveryRequest{
ResponseNonce: time.Now().String(),
Node: a.node(),
TypeUrl: sch.Resource().GroupVersionKind().String(),
})
}
}
func (a *ADSC) sendRsc(typeurl string, rsc []string) {
ex := a.Received[typeurl]
version := ""
nonce := ""
if ex != nil {
version = ex.VersionInfo
nonce = ex.Nonce
}
_ = a.stream.Send(&discovery.DiscoveryRequest{
ResponseNonce: nonce,
VersionInfo: version,
Node: a.node(),
TypeUrl: typeurl,
ResourceNames: rsc,
})
}
func (a *ADSC) ack(msg *discovery.DiscoveryResponse) {
var resources []string
if msg.TypeUrl == v3.EndpointType {
for c := range a.edsClusters {
resources = append(resources, c)
}
}
if msg.TypeUrl == v3.RouteType {
for r := range a.routes {
resources = append(resources, r)
}
}
_ = a.stream.Send(&discovery.DiscoveryRequest{
ResponseNonce: msg.Nonce,
TypeUrl: msg.TypeUrl,
Node: a.node(),
VersionInfo: msg.VersionInfo,
ResourceNames: resources,
})
}
// GetHTTPListeners returns all the http listeners.
func (a *ADSC) GetHTTPListeners() map[string]*listener.Listener {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.httpListeners
}
// GetTCPListeners returns all the tcp listeners.
func (a *ADSC) GetTCPListeners() map[string]*listener.Listener {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.tcpListeners
}
// GetEdsClusters returns all the eds type clusters.
func (a *ADSC) GetEdsClusters() map[string]*cluster.Cluster {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.edsClusters
}
// GetClusters returns all the non-eds type clusters.
func (a *ADSC) GetClusters() map[string]*cluster.Cluster {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.clusters
}
// GetRoutes returns all the routes.
func (a *ADSC) GetRoutes() map[string]*route.RouteConfiguration {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.routes
}
// GetEndpoints returns all the routes.
func (a *ADSC) GetEndpoints() map[string]*endpoint.ClusterLoadAssignment {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.eds
}
func (a *ADSC) handleMCP(gvk []string, resources []*any.Any) {
if len(gvk) != 3 {
return // Not MCP
}
// Generic - fill up the store
if a.Store == nil {
return
}
groupVersionKind := config.GroupVersionKind{Group: gvk[0], Version: gvk[1], Kind: gvk[2]}
existingConfigs, err := a.Store.List(groupVersionKind, "")
if err != nil {
adscLog.Warnf("Error listing existing configs %v", err)
return
}
received := make(map[string]*config.Config)
for _, rsc := range resources {
m := &mcp.Resource{}
err := rsc.UnmarshalTo(m)
if err != nil {
adscLog.Warnf("Error unmarshalling received MCP config %v", err)
continue
}
newCfg, err := a.mcpToPilot(m)
if err != nil {
adscLog.Warn("Invalid data ", err, " ", string(rsc.Value))
continue
}
if newCfg == nil {
continue
}
received[newCfg.Namespace+"/"+newCfg.Name] = newCfg
newCfg.GroupVersionKind = groupVersionKind
oldCfg := a.Store.Get(newCfg.GroupVersionKind, newCfg.Name, newCfg.Namespace)
if oldCfg == nil {
if _, err = a.Store.Create(*newCfg); err != nil {
adscLog.Warnf("Error adding a new resource to the store %v", err)
continue
}
} else if oldCfg.ResourceVersion != newCfg.ResourceVersion || newCfg.ResourceVersion == "" {
// update the store only when resource version differs or unset.
newCfg.Annotations[mem.ResourceVersion] = newCfg.ResourceVersion
newCfg.ResourceVersion = oldCfg.ResourceVersion
if _, err = a.Store.Update(*newCfg); err != nil {
adscLog.Warnf("Error updating an existing resource in the store %v", err)
continue
}
}
if a.LocalCacheDir != "" {
strResponse, err := json.MarshalIndent(newCfg, " ", " ")
if err != nil {
adscLog.Warnf("Error marshaling received MCP config %v", err)
continue
}
err = os.WriteFile(a.LocalCacheDir+"_res."+
newCfg.GroupVersionKind.Kind+"."+newCfg.Namespace+"."+newCfg.Name+".json", strResponse, 0o644)
if err != nil {
adscLog.Warnf("Error writing received MCP config to local file %v", err)
}
}
}
// remove deleted resources from cache
for _, config := range existingConfigs {
if _, ok := received[config.Namespace+"/"+config.Name]; !ok {
if err := a.Store.Delete(config.GroupVersionKind, config.Name, config.Namespace, nil); err != nil {
adscLog.Warnf("Error deleting an outdated resource from the store %v", err)
continue
}
if a.LocalCacheDir != "" {
err = os.Remove(a.LocalCacheDir + "_res." +
config.GroupVersionKind.Kind + "." + config.Namespace + "." + config.Name + ".json")
if err != nil {
adscLog.Warnf("Error deleting received MCP config to local file %v", err)
}
}
}
}
}