blob: 46e91f1d671ec9539d8e45f146cb3345881501e0 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package xds
import (
"encoding/json"
"strconv"
"strings"
"time"
)
import (
corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"istio.io/pkg/env"
istioversion "istio.io/pkg/version"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/util"
v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3"
)
// IstioControlPlaneInstance defines the format Istio uses for when creating Envoy config.core.v3.ControlPlane.identifier
type IstioControlPlaneInstance struct {
// The Istio component type (e.g. "istiod")
Component string
// The ID of the component instance
ID string
// The Istio version
Info istioversion.BuildInfo
}
var controlPlane *corev3.ControlPlane
// ControlPlane identifies the instance and Istio version.
func ControlPlane() *corev3.ControlPlane {
return controlPlane
}
func init() {
// The Pod Name (instance identity) is in PilotArgs, but not reachable globally nor from DiscoveryServer
podName := env.RegisterStringVar("POD_NAME", "", "").Get()
byVersion, err := json.Marshal(IstioControlPlaneInstance{
Component: "istiod",
ID: podName,
Info: istioversion.Info,
})
if err != nil {
log.Warnf("XDS: Could not serialize control plane id: %v", err)
}
controlPlane = &corev3.ControlPlane{Identifier: string(byVersion)}
}
func (s *DiscoveryServer) findGenerator(typeURL string, con *Connection) model.XdsResourceGenerator {
if g, f := s.Generators[con.proxy.Metadata.Generator+"/"+typeURL]; f {
return g
}
if g, f := s.Generators[typeURL]; f {
return g
}
// XdsResourceGenerator is the default generator for this connection. We want to allow
// some types to use custom generators - for example EDS.
g := con.proxy.XdsResourceGenerator
if g == nil {
if strings.HasPrefix(typeURL, "istio.io/debug/") {
g = s.Generators["event"]
} else {
// TODO move this to just directly using the resource TypeUrl
g = s.Generators["api"] // default to "MCP" generators - any type supported by store
}
}
return g
}
// Push an XDS resource for the given connection. Configuration will be generated
// based on the passed in generator. Based on the updates field, generators may
// choose to send partial or even no response if there are no changes.
func (s *DiscoveryServer) pushXds(con *Connection, w *model.WatchedResource, req *model.PushRequest) error {
if w == nil {
return nil
}
gen := s.findGenerator(w.TypeUrl, con)
if gen == nil {
return nil
}
t0 := time.Now()
// If delta is set, client is requesting new resources or removing old ones. We should just generate the
// new resources it needs, rather than the entire set of known resources.
// Note: we do not need to account for unsubscribed resources as these are handled by parent removal;
// See https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#deleting-resources.
// This means if there are only removals, we will not respond.
var logFiltered string
if !req.Delta.IsEmpty() && features.PartialFullPushes &&
!con.proxy.IsProxylessGrpc() {
logFiltered = " filtered:" + strconv.Itoa(len(w.ResourceNames)-len(req.Delta.Subscribed))
w = &model.WatchedResource{
TypeUrl: w.TypeUrl,
ResourceNames: req.Delta.Subscribed.UnsortedList(),
}
}
res, logdata, err := gen.Generate(con.proxy, w, req)
if err != nil || res == nil {
// If we have nothing to send, report that we got an ACK for this version.
if s.StatusReporter != nil {
s.StatusReporter.RegisterEvent(con.conID, w.TypeUrl, req.Push.LedgerVersion)
}
return err
}
defer func() { recordPushTime(w.TypeUrl, time.Since(t0)) }()
resp := &discovery.DiscoveryResponse{
ControlPlane: ControlPlane(),
TypeUrl: w.TypeUrl,
// TODO: send different version for incremental eds
VersionInfo: req.Push.PushVersion,
Nonce: nonce(req.Push.LedgerVersion),
Resources: model.ResourcesToAny(res),
}
configSize := ResourceSize(res)
configSizeBytes.With(typeTag.Value(w.TypeUrl)).Record(float64(configSize))
ptype := "PUSH"
info := ""
if logdata.Incremental {
ptype = "PUSH INC"
}
if len(logdata.AdditionalInfo) > 0 {
info = " " + logdata.AdditionalInfo
}
if len(logFiltered) > 0 {
info += logFiltered
}
if err := con.send(resp); err != nil {
if recordSendError(w.TypeUrl, err) {
log.Warnf("%s: Send failure for node:%s resources:%d size:%s%s: %v",
v3.GetShortType(w.TypeUrl), con.proxy.ID, len(res), util.ByteCount(configSize), info, err)
}
return err
}
switch {
case logdata.Incremental:
if log.DebugEnabled() {
log.Debugf("%s: %s%s for node:%s resources:%d size:%s%s",
v3.GetShortType(w.TypeUrl), ptype, req.PushReason(), con.proxy.ID, len(res), util.ByteCount(configSize), info)
}
default:
debug := ""
if log.DebugEnabled() {
// Add additional information to logs when debug mode enabled.
debug = " nonce:" + resp.Nonce + " version:" + resp.VersionInfo
}
log.Infof("%s: %s%s for node:%s resources:%d size:%v%s%s", v3.GetShortType(w.TypeUrl), ptype, req.PushReason(), con.proxy.ID, len(res),
util.ByteCount(ResourceSize(res)), info, debug)
}
return nil
}
func ResourceSize(r model.Resources) int {
// Approximate size by looking at the Any marshaled size. This avoids high cost
// proto.Size, at the expense of slightly under counting.
size := 0
for _, r := range r {
size += len(r.Resource.Value)
}
return size
}