blob: e74bba986cd3b3c9449fc91f85988d39d80eaa1d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xds
import (
"context"
"github.com/apache/dubbo-kubernetes/pkg/maps"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/pkg/xds"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
v3 "github.com/apache/dubbo-kubernetes/sail/pkg/xds/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"
"strconv"
"strings"
"sync/atomic"
"time"
)
var connectionNumber = int64(0)
type (
DiscoveryStream = xds.DiscoveryStream
DeltaDiscoveryStream = discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer
DeltaDiscoveryClient = discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesClient
)
type Connection struct {
xds.Connection
node *core.Node
proxy *model.Proxy
deltaStream DeltaDiscoveryStream
deltaReqChan chan *discovery.DeltaDiscoveryRequest
s *DiscoveryServer
ids []string
}
type Event struct {
pushRequest *model.PushRequest
done func()
}
func (conn *Connection) XdsConnection() *xds.Connection {
return &conn.Connection
}
func (conn *Connection) Proxy() *model.Proxy {
return conn.proxy
}
func (s *DiscoveryServer) DeltaAggregatedResources(stream discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
return s.StreamDeltas(stream)
}
func (s *DiscoveryServer) StreamAggregatedResources(stream DiscoveryStream) error {
return s.Stream(stream)
}
func (s *DiscoveryServer) StartPush(req *model.PushRequest) {
req.Start = time.Now()
for _, p := range s.AllClients() {
s.pushQueue.Enqueue(p, req)
}
}
func (s *DiscoveryServer) AllClients() []*Connection {
s.adsClientsMutex.RLock()
defer s.adsClientsMutex.RUnlock()
return maps.Values(s.adsClients)
}
func (s *DiscoveryServer) AdsPushAll(req *model.PushRequest) {
if !req.Full {
klog.Infof("XDS: Incremental Pushing ConnectedEndpoints:%d", s.adsClientCount())
} else {
totalService := len(req.Push.GetAllServices())
klog.Infof("XDS: Pushing Services:%d ConnectedEndpoints:%d", totalService, s.adsClientCount())
if req.ConfigsUpdated == nil {
req.ConfigsUpdated = make(sets.Set[model.ConfigKey])
}
}
s.StartPush(req)
}
func (s *DiscoveryServer) adsClientCount() int {
s.adsClientsMutex.RLock()
defer s.adsClientsMutex.RUnlock()
return len(s.adsClients)
}
func (s *DiscoveryServer) computeProxyState(proxy *model.Proxy, request *model.PushRequest) {
proxy.Lock()
defer proxy.Unlock()
// 1. If request == nil(initiation phase) or request.ConfigsUpdated == nil(global push), set proxy serviceTargets.
// 2. otherwise only set when svc update, this is for the case that a service may select the proxy
if request == nil || request.Forced ||
proxy.ShouldUpdateServiceTargets(request.ConfigsUpdated) {
proxy.SetServiceTargets(s.Env.ServiceDiscovery)
}
recomputeLabels := request == nil || request.IsProxyUpdate()
if recomputeLabels {
}
push := proxy.LastPushContext
if request == nil {
} else {
push = request.Push
if request.Forced {
}
for conf := range request.ConfigsUpdated {
switch conf.Kind {
}
}
}
proxy.LastPushContext = push
if request != nil {
proxy.LastPushTime = request.Start
}
}
func connectionID(node string) string {
id := atomic.AddInt64(&connectionNumber, 1)
return node + "-" + strconv.FormatInt(id, 10)
}
func (s *DiscoveryServer) initConnection(node *core.Node, con *Connection, identities []string) error {
proxy, err := s.initProxyMetadata(node)
if err != nil {
return err
}
if alias, exists := s.ClusterAliases[proxy.Metadata.ClusterID]; exists {
proxy.Metadata.ClusterID = alias
}
proxy.LastPushContext = s.globalPushContext()
con.SetID(connectionID(proxy.ID))
con.node = node
con.proxy = proxy
if err := s.authorize(con, identities); err != nil {
return err
}
s.addCon(con.ID(), con)
defer con.MarkInitialized()
if err := s.initializeProxy(con); err != nil {
s.closeConnection(con)
return err
}
return nil
}
func (s *DiscoveryServer) closeConnection(con *Connection) {
if con.ID() == "" {
return
}
s.removeCon(con.ID())
}
func (s *DiscoveryServer) initializeProxy(con *Connection) error {
proxy := con.proxy
s.computeProxyState(proxy, nil)
proxy.WatchedResources = map[string]*model.WatchedResource{}
// Based on node metadata and version, we can associate a different generator.
if proxy.Metadata.Generator != "" {
proxy.XdsResourceGenerator = s.Generators[proxy.Metadata.Generator]
}
return nil
}
func (s *DiscoveryServer) initProxyMetadata(node *core.Node) (*model.Proxy, error) {
meta, err := model.ParseMetadata(node.Metadata)
if err != nil {
return nil, status.New(codes.InvalidArgument, err.Error()).Err()
}
proxy, err := model.ParseServiceNodeWithMetadata(node.Id, meta)
if err != nil {
return nil, status.New(codes.InvalidArgument, err.Error()).Err()
}
// Update the config namespace associated with this proxy
proxy.ConfigNamespace = model.GetProxyConfigNamespace(proxy)
proxy.XdsNode = node
return proxy, nil
}
func (s *DiscoveryServer) addCon(conID string, con *Connection) {
s.adsClientsMutex.Lock()
defer s.adsClientsMutex.Unlock()
s.adsClients[conID] = con
}
func (s *DiscoveryServer) removeCon(conID string) {
s.adsClientsMutex.Lock()
defer s.adsClientsMutex.Unlock()
if _, exist := s.adsClients[conID]; !exist {
klog.Errorf("ADS: Removing connection for non-existing node:%v.", conID)
} else {
delete(s.adsClients, conID)
}
}
func (s *DiscoveryServer) Stream(stream DiscoveryStream) error {
if !s.IsServerReady() {
return status.Error(codes.Unavailable, "server is not ready to serve discovery information")
}
ctx := stream.Context()
peerAddr := "0.0.0.0"
if peerInfo, ok := peer.FromContext(ctx); ok {
peerAddr = peerInfo.Addr.String()
}
// TODO WaitForRequestLimit?
if err := s.WaitForRequestLimit(stream.Context()); err != nil {
klog.Warningf("ADS: %q exceeded rate limit: %v", peerAddr, err)
return status.Errorf(codes.ResourceExhausted, "request rate limit exceeded: %v", err)
}
ids, err := s.authenticate(ctx)
if err != nil {
return status.Error(codes.Unauthenticated, err.Error())
}
if ids != nil {
klog.V(2).Infof("Authenticated XDS: %v with identity %v", peerAddr, ids)
} else {
klog.V(2).Infof("Unauthenticated XDS: %s", peerAddr)
}
s.globalPushContext().InitContext(s.Env, nil, nil)
con := newConnection(peerAddr, stream)
con.ids = ids
con.s = s
return xds.Stream(con)
}
func (s *DiscoveryServer) WaitForRequestLimit(ctx context.Context) error {
if s.RequestRateLimit.Limit() == 0 {
return nil
}
wait, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
return s.RequestRateLimit.Wait(wait)
}
func newConnection(peerAddr string, stream DiscoveryStream) *Connection {
return &Connection{
Connection: xds.NewConnection(peerAddr, stream),
}
}
var PushOrder = []string{
v3.ClusterType,
v3.EndpointType,
v3.ListenerType,
v3.RouteType,
v3.AddressType,
}
var KnownOrderedTypeUrls = sets.New(PushOrder...)
func (conn *Connection) watchedResourcesByOrder() []*model.WatchedResource {
allWatched := conn.proxy.ShallowCloneWatchedResources()
ordered := make([]*model.WatchedResource, 0, len(allWatched))
// first add all known types, in order
for _, tp := range PushOrder {
if allWatched[tp] != nil {
ordered = append(ordered, allWatched[tp])
}
}
// Then add any undeclared types
for tp, res := range allWatched {
if !KnownOrderedTypeUrls.Contains(tp) {
ordered = append(ordered, res)
}
}
return ordered
}
func (conn *Connection) Initialize(node *core.Node) error {
return conn.s.initConnection(node, conn, conn.ids)
}
func (conn *Connection) Close() {
conn.s.closeConnection(conn)
}
func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error {
pushRequest := pushEv.pushRequest
if pushRequest.Full {
// Update Proxy with current information.
s.computeProxyState(con.proxy, pushRequest)
}
pushRequest, needsPush := s.ProxyNeedsPush(con.proxy, pushRequest)
if !needsPush {
klog.V(2).Infof("Skipping push to %v, no updates required", con.ID())
return nil
}
// Send pushes to all generators
// Each Generator is responsible for determining if the push event requires a push
wrl := con.watchedResourcesByOrder()
for _, w := range wrl {
if err := s.pushXds(con, w, pushRequest); err != nil {
return err
}
}
return nil
}
func (s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con *Connection) error {
stype := v3.GetShortType(req.TypeUrl)
klog.V(2).Infof("ADS:%s: REQ %s resources:%d nonce:%s ", stype,
con.ID(), len(req.ResourceNames), req.ResponseNonce)
if req.TypeUrl == v3.HealthInfoType {
return nil
}
if strings.HasPrefix(req.TypeUrl, v3.DebugType) {
return s.pushXds(con,
&model.WatchedResource{TypeUrl: req.TypeUrl, ResourceNames: sets.New(req.ResourceNames...)},
&model.PushRequest{Full: true, Push: con.proxy.LastPushContext, Forced: true})
}
shouldRespond, delta := xds.ShouldRespond(con.proxy, con.ID(), req)
if !shouldRespond {
return nil
}
request := &model.PushRequest{
Full: true,
Push: con.proxy.LastPushContext,
Reason: model.NewReasonStats(model.ProxyRequest),
Start: con.proxy.LastPushTime,
Delta: delta,
Forced: true,
}
return s.pushXds(con, con.proxy.GetWatchedResource(req.TypeUrl), request)
}
func (conn *Connection) Push(ev any) error {
pushEv := ev.(*Event)
err := conn.s.pushConnection(conn, pushEv)
pushEv.done()
return err
}
func (conn *Connection) Process(req *discovery.DiscoveryRequest) error {
return conn.s.processRequest(req, conn)
}
func (conn *Connection) Watcher() xds.Watcher {
return conn.proxy
}