blob: 8de5422b3ebec51ed1248de9ca01ddfffbfef1ec [file] [log] [blame]
package xds
import (
"encoding/json"
"github.com/apache/dubbo-kubernetes/pkg/env"
"github.com/apache/dubbo-kubernetes/pkg/lazy"
dubboversion "github.com/apache/dubbo-kubernetes/pkg/version"
"github.com/apache/dubbo-kubernetes/pkg/xds"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"k8s.io/klog/v2"
"strconv"
"strings"
)
type IstioControlPlaneInstance struct {
// The Istio component type (e.g. "istiod")
Component string
// The ID of the component instance
ID string
// The Istio version
Info dubboversion.BuildInfo
}
var controlPlane = lazy.New(func() (*core.ControlPlane, error) {
// The Pod Name (instance identity) is in PilotArgs, but not reachable globally nor from DiscoveryServer
podName := env.Register("POD_NAME", "", "").Get()
byVersion, err := json.Marshal(IstioControlPlaneInstance{
Component: "dubbod",
ID: podName,
Info: dubboversion.Info,
})
if err != nil {
klog.Warningf("XDS: Could not serialize control plane id: %v", err)
}
return &core.ControlPlane{Identifier: string(byVersion)}, nil
})
func ControlPlane(typ string) *core.ControlPlane {
// Error will never happen because the getter of lazy does not return error.
cp, _ := controlPlane.Get()
return cp
}
func (s *DiscoveryServer) pushXds(con *Connection, w *model.WatchedResource, req *model.PushRequest) error {
if w == nil {
return nil
}
gen := s.findGenerator(w.TypeUrl, con)
if gen == nil {
return nil
}
// If delta is set, client is requesting new resources or removing old ones. We should just generate the
// new resources it needs, rather than the entire set of known resources.
// Note: we do not need to account for unsubscribed resources as these are handled by parent removal;
// See https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#deleting-resources.
// This means if there are only removals, we will not respond.
var logFiltered string
if !req.Delta.IsEmpty() && !con.proxy.IsProxylessGrpc() {
logFiltered = " filtered:" + strconv.Itoa(len(w.ResourceNames)-len(req.Delta.Subscribed))
w = &model.WatchedResource{
TypeUrl: w.TypeUrl,
ResourceNames: req.Delta.Subscribed,
}
}
res, logdata, err := gen.Generate(con.proxy, w, req)
info := ""
if len(logdata.AdditionalInfo) > 0 {
info = " " + logdata.AdditionalInfo
}
if len(logFiltered) > 0 {
info += logFiltered
}
if err != nil || res == nil {
return err
}
resp := &discovery.DiscoveryResponse{
TypeUrl: w.TypeUrl,
Resources: xds.ResourcesToAny(res),
}
if err := xds.Send(con, resp); err != nil {
return err
}
return nil
}
func (s *DiscoveryServer) findGenerator(typeURL string, con *Connection) model.XdsResourceGenerator {
if g, f := s.Generators[con.proxy.Metadata.Generator+"/"+typeURL]; f {
return g
}
if g, f := s.Generators[typeURL]; f {
return g
}
// XdsResourceGenerator is the default generator for this connection. We want to allow
// some types to use custom generators - for example EDS.
g := con.proxy.XdsResourceGenerator
if g == nil {
if strings.HasPrefix(typeURL, TypeDebugPrefix) {
g = s.Generators["event"]
} else {
// TODO move this to just directly using the resource TypeUrl
g = s.Generators["api"] // default to "MCP" generators - any type supported by store
}
}
return g
}