blob: 8b6feb384fd6ecba9771d4feb0fded82a057d2eb [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package xds
import (
"encoding/json"
"fmt"
"html/template"
"net"
"net/http"
"net/http/pprof"
"sort"
"strings"
"time"
)
import (
adminapi "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
wasm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/wasm/v3"
tls "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"google.golang.org/protobuf/proto"
any "google.golang.org/protobuf/types/known/anypb"
istiolog "istio.io/pkg/log"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/config/kube/crd"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/util"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/aggregate"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/memory"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/provider"
v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/collection"
"github.com/apache/dubbo-go-pixiu/pkg/config/xds"
"github.com/apache/dubbo-go-pixiu/pkg/network"
"github.com/apache/dubbo-go-pixiu/pkg/util/protomarshal"
)
var indexTmpl = template.Must(template.New("index").Parse(`<html>
<head>
<title>Pilot Debug Console</title>
</head>
<style>
#endpoints {
font-family: "Trebuchet MS", Arial, Helvetica, sans-serif;
border-collapse: collapse;
}
#endpoints td, #endpoints th {
border: 1px solid #ddd;
padding: 8px;
}
#endpoints tr:nth-child(even){background-color: #f2f2f2;}
#endpoints tr:hover {background-color: #ddd;}
#endpoints th {
padding-top: 12px;
padding-bottom: 12px;
text-align: left;
background-color: black;
color: white;
}
</style>
<body>
<br/>
<table id="endpoints">
<tr><th>Endpoint</th><th>Description</th></tr>
{{range .}}
<tr>
<td><a href='{{.Href}}'>{{.Name}}</a></td><td>{{.Help}}</td>
</tr>
{{end}}
</table>
<br/>
</body>
</html>
`))
// AdsClient defines the data that is displayed on "/adsz" endpoint.
type AdsClient struct {
ConnectionID string `json:"connectionId"`
ConnectedAt time.Time `json:"connectedAt"`
PeerAddress string `json:"address"`
Metadata *model.NodeMetadata `json:"metadata,omitempty"`
Watches map[string][]string `json:"watches,omitempty"`
}
// AdsClients is collection of AdsClient connected to this Istiod.
type AdsClients struct {
Total int `json:"totalClients"`
Connected []AdsClient `json:"clients,omitempty"`
}
// SyncStatus is the synchronization status between Pilot and a given Envoy
type SyncStatus struct {
ClusterID string `json:"cluster_id,omitempty"`
ProxyID string `json:"proxy,omitempty"`
ProxyVersion string `json:"proxy_version,omitempty"`
IstioVersion string `json:"istio_version,omitempty"`
ClusterSent string `json:"cluster_sent,omitempty"`
ClusterAcked string `json:"cluster_acked,omitempty"`
ListenerSent string `json:"listener_sent,omitempty"`
ListenerAcked string `json:"listener_acked,omitempty"`
RouteSent string `json:"route_sent,omitempty"`
RouteAcked string `json:"route_acked,omitempty"`
EndpointSent string `json:"endpoint_sent,omitempty"`
EndpointAcked string `json:"endpoint_acked,omitempty"`
ExtensionConfigSent string `json:"extensionconfig_sent,omitempty"`
ExtensionConfigAcked string `json:"extensionconfig_acked,omitempty"`
}
// SyncedVersions shows what resourceVersion of a given resource has been acked by Envoy.
type SyncedVersions struct {
ProxyID string `json:"proxy,omitempty"`
ClusterVersion string `json:"cluster_acked,omitempty"`
ListenerVersion string `json:"listener_acked,omitempty"`
RouteVersion string `json:"route_acked,omitempty"`
}
// InitDebug initializes the debug handlers and adds a debug in-memory registry.
func (s *DiscoveryServer) InitDebug(mux *http.ServeMux, sctl *aggregate.Controller, enableProfiling bool,
fetchWebhook func() map[string]string) {
// For debugging and load testing v2 we add an memory registry.
s.MemRegistry = memory.NewServiceDiscovery()
s.MemRegistry.EDSUpdater = s
s.MemRegistry.ClusterID = "v2-debug"
sctl.AddRegistry(serviceregistry.Simple{
ClusterID: "v2-debug",
ProviderID: provider.Mock,
ServiceDiscovery: s.MemRegistry,
Controller: s.MemRegistry.Controller,
})
internalMux := http.NewServeMux()
s.AddDebugHandlers(mux, internalMux, enableProfiling, fetchWebhook)
debugGen, ok := (s.Generators[TypeDebug]).(*DebugGen)
if ok {
debugGen.DebugMux = internalMux
}
}
func (s *DiscoveryServer) AddDebugHandlers(mux, internalMux *http.ServeMux, enableProfiling bool, webhook func() map[string]string) {
// Debug handlers on HTTP ports are added for backward compatibility.
// They will be exposed on XDS-over-TLS in future releases.
if !features.EnableDebugOnHTTP {
return
}
if enableProfiling {
s.addDebugHandler(mux, internalMux, "/debug/pprof/", "Displays pprof index", pprof.Index)
s.addDebugHandler(mux, internalMux, "/debug/pprof/cmdline", "The command line invocation of the current program", pprof.Cmdline)
s.addDebugHandler(mux, internalMux, "/debug/pprof/profile", "CPU profile", pprof.Profile)
s.addDebugHandler(mux, internalMux, "/debug/pprof/symbol", "Symbol looks up the program counters listed in the request", pprof.Symbol)
s.addDebugHandler(mux, internalMux, "/debug/pprof/trace", "A trace of execution of the current program.", pprof.Trace)
}
mux.HandleFunc("/debug", s.Debug)
if features.EnableUnsafeAdminEndpoints {
s.addDebugHandler(mux, internalMux, "/debug/force_disconnect", "Disconnects a proxy from this Pilot", s.forceDisconnect)
}
s.addDebugHandler(mux, internalMux, "/debug/ecdsz", "Status and debug interface for ECDS", s.ecdsz)
s.addDebugHandler(mux, internalMux, "/debug/edsz", "Status and debug interface for EDS", s.Edsz)
s.addDebugHandler(mux, internalMux, "/debug/ndsz", "Status and debug interface for NDS", s.ndsz)
s.addDebugHandler(mux, internalMux, "/debug/adsz", "Status and debug interface for ADS", s.adsz)
s.addDebugHandler(mux, internalMux, "/debug/adsz?push=true", "Initiates push of the current state to all connected endpoints", s.adsz)
s.addDebugHandler(mux, internalMux, "/debug/syncz", "Synchronization status of all Envoys connected to this Pilot instance", s.Syncz)
s.addDebugHandler(mux, internalMux, "/debug/config_distribution", "Version status of all Envoys connected to this Pilot instance", s.distributedVersions)
s.addDebugHandler(mux, internalMux, "/debug/registryz", "Debug support for registry", s.registryz)
s.addDebugHandler(mux, internalMux, "/debug/endpointz", "Debug support for endpoints", s.endpointz)
s.addDebugHandler(mux, internalMux, "/debug/endpointShardz", "Info about the endpoint shards", s.endpointShardz)
s.addDebugHandler(mux, internalMux, "/debug/cachez", "Info about the internal XDS caches", s.cachez)
s.addDebugHandler(mux, internalMux, "/debug/cachez?sizes=true", "Info about the size of the internal XDS caches", s.cachez)
s.addDebugHandler(mux, internalMux, "/debug/cachez?clear=true", "Clear the XDS caches", s.cachez)
s.addDebugHandler(mux, internalMux, "/debug/configz", "Debug support for config", s.configz)
s.addDebugHandler(mux, internalMux, "/debug/sidecarz", "Debug sidecar scope for a proxy", s.sidecarz)
s.addDebugHandler(mux, internalMux, "/debug/resourcesz", "Debug support for watched resources", s.resourcez)
s.addDebugHandler(mux, internalMux, "/debug/instancesz", "Debug support for service instances", s.instancesz)
s.addDebugHandler(mux, internalMux, "/debug/authorizationz", "Internal authorization policies", s.authorizationz)
s.addDebugHandler(mux, internalMux, "/debug/telemetryz", "Debug Telemetry configuration", s.telemetryz)
s.addDebugHandler(mux, internalMux, "/debug/config_dump", "ConfigDump in the form of the Envoy admin config dump API for passed in proxyID", s.ConfigDump)
s.addDebugHandler(mux, internalMux, "/debug/push_status", "Last PushContext Details", s.pushStatusHandler)
s.addDebugHandler(mux, internalMux, "/debug/pushcontext", "Debug support for current push context", s.pushContextHandler)
s.addDebugHandler(mux, internalMux, "/debug/connections", "Info about the connected XDS clients", s.connectionsHandler)
s.addDebugHandler(mux, internalMux, "/debug/inject", "Active inject template", s.injectTemplateHandler(webhook))
s.addDebugHandler(mux, internalMux, "/debug/mesh", "Active mesh config", s.meshHandler)
s.addDebugHandler(mux, internalMux, "/debug/clusterz", "List remote clusters where istiod reads endpoints", s.clusterz)
s.addDebugHandler(mux, internalMux, "/debug/networkz", "List cross-network gateways", s.networkz)
s.addDebugHandler(mux, internalMux, "/debug/mcsz", "List information about Kubernetes MCS services", s.mcsz)
s.addDebugHandler(mux, internalMux, "/debug/list", "List all supported debug commands in json", s.List)
}
func (s *DiscoveryServer) addDebugHandler(mux *http.ServeMux, internalMux *http.ServeMux,
path string, help string, handler func(http.ResponseWriter, *http.Request)) {
s.debugHandlers[path] = help
// Add handler without auth. This mux is never exposed on an HTTP server and only used internally
if internalMux != nil {
internalMux.HandleFunc(path, handler)
}
// Add handler with auth; this is expose on an HTTP server
mux.HandleFunc(path, s.allowAuthenticatedOrLocalhost(http.HandlerFunc(handler)))
}
func (s *DiscoveryServer) allowAuthenticatedOrLocalhost(next http.Handler) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// Request is from localhost, no need to authenticate
if isRequestFromLocalhost(req) {
next.ServeHTTP(w, req)
return
}
// Authenticate request with the same method as XDS
authFailMsgs := make([]string, 0)
var ids []string
for _, authn := range s.Authenticators {
u, err := authn.AuthenticateRequest(req)
// If one authenticator passes, return
if u != nil && u.Identities != nil && err == nil {
ids = u.Identities
break
}
authFailMsgs = append(authFailMsgs, fmt.Sprintf("Authenticator %s: %v", authn.AuthenticatorType(), err))
}
if ids == nil {
istiolog.Errorf("Failed to authenticate %s %v", req.URL, authFailMsgs)
// Not including detailed info in the response, XDS doesn't either (returns a generic "authentication failure).
w.WriteHeader(http.StatusUnauthorized)
return
}
// TODO: Check that the identity contains dubbo-system namespace, else block or restrict to only info that
// is visible to the authenticated SA. Will require changes in docs and istioctl too.
next.ServeHTTP(w, req)
}
}
func isRequestFromLocalhost(r *http.Request) bool {
ip, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
return false
}
userIP := net.ParseIP(ip)
return userIP.IsLoopback()
}
// Syncz dumps the synchronization status of all Envoys connected to this Pilot instance
func (s *DiscoveryServer) Syncz(w http.ResponseWriter, _ *http.Request) {
syncz := make([]SyncStatus, 0)
for _, con := range s.Clients() {
node := con.proxy
if node != nil {
syncz = append(syncz, SyncStatus{
ProxyID: node.ID,
ClusterID: node.Metadata.ClusterID.String(),
IstioVersion: node.Metadata.IstioVersion,
ClusterSent: con.NonceSent(v3.ClusterType),
ClusterAcked: con.NonceAcked(v3.ClusterType),
ListenerSent: con.NonceSent(v3.ListenerType),
ListenerAcked: con.NonceAcked(v3.ListenerType),
RouteSent: con.NonceSent(v3.RouteType),
RouteAcked: con.NonceAcked(v3.RouteType),
EndpointSent: con.NonceSent(v3.EndpointType),
EndpointAcked: con.NonceAcked(v3.EndpointType),
ExtensionConfigSent: con.NonceSent(v3.ExtensionConfigurationType),
ExtensionConfigAcked: con.NonceAcked(v3.ExtensionConfigurationType),
})
}
}
writeJSON(w, syncz)
}
// registryz providees debug support for registry - adding and listing model items.
// Can be combined with the push debug interface to reproduce changes.
func (s *DiscoveryServer) registryz(w http.ResponseWriter, req *http.Request) {
all := s.Env.ServiceDiscovery.Services()
writeJSON(w, all)
}
// Dumps info about the endpoint shards, tracked using the new direct interface.
// Legacy registry provides are synced to the new data structure as well, during
// the full push.
func (s *DiscoveryServer) endpointShardz(w http.ResponseWriter, req *http.Request) {
w.Header().Add("Content-Type", "application/json")
out, _ := json.MarshalIndent(s.EndpointIndex.Shardz(), " ", " ")
_, _ = w.Write(out)
}
func (s *DiscoveryServer) cachez(w http.ResponseWriter, req *http.Request) {
if err := req.ParseForm(); err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("Failed to parse request\n"))
return
}
if req.Form.Get("clear") != "" {
s.Cache.ClearAll()
_, _ = w.Write([]byte("Cache cleared\n"))
return
}
if req.Form.Get("sizes") != "" {
snapshot := s.Cache.Snapshot()
res := make(map[string]string, len(snapshot))
totalSize := 0
for _, resource := range snapshot {
if resource == nil {
continue
}
resourceType := resource.Resource.TypeUrl
sz := len(resource.Resource.GetValue())
res[resourceType] += util.ByteCount(sz)
totalSize += sz
}
res["total"] = util.ByteCount(totalSize)
writeJSON(w, res)
return
}
snapshot := s.Cache.Snapshot()
resources := make(map[string][]string, len(snapshot)) // Key is typeUrl and value is resource names.
for key, resource := range snapshot {
if resource == nil {
continue
}
resourceType := resource.Resource.TypeUrl
resources[resourceType] = append(resources[resourceType], resource.Name+"/"+key)
}
writeJSON(w, resources)
}
type endpointzResponse struct {
Service string `json:"svc"`
Endpoints []*model.ServiceInstance `json:"ep"`
}
// Endpoint debugging
func (s *DiscoveryServer) endpointz(w http.ResponseWriter, req *http.Request) {
if _, f := req.URL.Query()["brief"]; f {
svc := s.Env.ServiceDiscovery.Services()
for _, ss := range svc {
for _, p := range ss.Ports {
all := s.Env.ServiceDiscovery.InstancesByPort(ss, p.Port, nil)
for _, svc := range all {
_, _ = fmt.Fprintf(w, "%s:%s %s:%d %v %s\n", ss.Hostname,
p.Name, svc.Endpoint.Address, svc.Endpoint.EndpointPort, svc.Endpoint.Labels,
svc.Endpoint.ServiceAccount)
}
}
}
return
}
svc := s.Env.ServiceDiscovery.Services()
resp := make([]endpointzResponse, 0)
for _, ss := range svc {
for _, p := range ss.Ports {
all := s.Env.ServiceDiscovery.InstancesByPort(ss, p.Port, nil)
resp = append(resp, endpointzResponse{
Service: fmt.Sprintf("%s:%s", ss.Hostname, p.Name),
Endpoints: all,
})
}
}
writeJSON(w, resp)
}
func (s *DiscoveryServer) distributedVersions(w http.ResponseWriter, req *http.Request) {
if !features.EnableDistributionTracking {
w.WriteHeader(http.StatusConflict)
_, _ = fmt.Fprint(w, "Pilot Version tracking is disabled. Please set the "+
"PILOT_ENABLE_CONFIG_DISTRIBUTION_TRACKING environment variable to true to enable.")
return
}
if resourceID := req.URL.Query().Get("resource"); resourceID != "" {
proxyNamespace := req.URL.Query().Get("proxy_namespace")
knownVersions := make(map[string]string)
var results []SyncedVersions
for _, con := range s.Clients() {
// wrap this in independent scope so that panic's don't bypass Unlock...
con.proxy.RLock()
if con.proxy != nil && (proxyNamespace == "" || proxyNamespace == con.proxy.ConfigNamespace) {
// read nonces from our statusreporter to allow for skipped nonces, etc.
results = append(results, SyncedVersions{
ProxyID: con.proxy.ID,
ClusterVersion: s.getResourceVersion(s.StatusReporter.QueryLastNonce(con.conID, v3.ClusterType),
resourceID, knownVersions),
ListenerVersion: s.getResourceVersion(s.StatusReporter.QueryLastNonce(con.conID, v3.ListenerType),
resourceID, knownVersions),
RouteVersion: s.getResourceVersion(s.StatusReporter.QueryLastNonce(con.conID, v3.RouteType),
resourceID, knownVersions),
})
}
con.proxy.RUnlock()
}
writeJSON(w, results)
} else {
w.WriteHeader(http.StatusUnprocessableEntity)
_, _ = fmt.Fprintf(w, "querystring parameter 'resource' is required\n")
}
}
// VersionLen is the Config Version and is only used as the nonce prefix, but we can reconstruct
// it because is is a b64 encoding of a 64 bit array, which will always be 12 chars in length.
// len = ceil(bitlength/(2^6))+1
const VersionLen = 12
func (s *DiscoveryServer) getResourceVersion(nonce, key string, cache map[string]string) string {
if len(nonce) < VersionLen {
return ""
}
configVersion := nonce[:VersionLen]
result, ok := cache[configVersion]
if !ok {
lookupResult, err := s.Env.GetLedger().GetPreviousValue(configVersion, key)
if err != nil {
istiolog.Errorf("Unable to retrieve resource %s at version %s: %v", key, configVersion, err)
lookupResult = ""
}
// update the cache even on an error, because errors will not resolve themselves, and we don't want to
// repeat the same error for many s.adsClients.
cache[configVersion] = lookupResult
return lookupResult
}
return result
}
// kubernetesConfig wraps a config.Config with a custom marshaling method that matches a Kubernetes
// object structure.
type kubernetesConfig struct {
config.Config
}
func (k kubernetesConfig) MarshalJSON() ([]byte, error) {
cfg, err := crd.ConvertConfig(k.Config)
if err != nil {
return nil, err
}
return json.Marshal(cfg)
}
// Config debugging.
func (s *DiscoveryServer) configz(w http.ResponseWriter, req *http.Request) {
configs := make([]kubernetesConfig, 0)
s.Env.ConfigStore.Schemas().ForEach(func(schema collection.Schema) bool {
cfg, _ := s.Env.ConfigStore.List(schema.Resource().GroupVersionKind(), "")
for _, c := range cfg {
configs = append(configs, kubernetesConfig{c})
}
return false
})
writeJSON(w, configs)
}
// SidecarScope debugging
func (s *DiscoveryServer) sidecarz(w http.ResponseWriter, req *http.Request) {
proxyID, con := s.getDebugConnection(req)
if con == nil {
s.errorHandler(w, proxyID, con)
return
}
writeJSON(w, con.proxy.SidecarScope)
}
// Resource debugging.
func (s *DiscoveryServer) resourcez(w http.ResponseWriter, _ *http.Request) {
schemas := make([]config.GroupVersionKind, 0)
s.Env.Schemas().ForEach(func(schema collection.Schema) bool {
schemas = append(schemas, schema.Resource().GroupVersionKind())
return false
})
writeJSON(w, schemas)
}
// AuthorizationDebug holds debug information for authorization policy.
type AuthorizationDebug struct {
AuthorizationPolicies *model.AuthorizationPolicies `json:"authorization_policies"`
}
// authorizationz dumps the internal authorization policies.
func (s *DiscoveryServer) authorizationz(w http.ResponseWriter, req *http.Request) {
info := AuthorizationDebug{
AuthorizationPolicies: s.globalPushContext().AuthzPolicies,
}
writeJSON(w, info)
}
// AuthorizationDebug holds debug information for authorization policy.
type TelemetryDebug struct {
Telemetries *model.Telemetries `json:"telemetries"`
}
func (s *DiscoveryServer) telemetryz(w http.ResponseWriter, req *http.Request) {
info := TelemetryDebug{
Telemetries: s.globalPushContext().Telemetry,
}
writeJSON(w, info)
}
// connectionsHandler implements interface for displaying current connections.
// It is mapped to /debug/connections.
func (s *DiscoveryServer) connectionsHandler(w http.ResponseWriter, req *http.Request) {
adsClients := &AdsClients{}
connections := s.Clients()
adsClients.Total = len(connections)
for _, c := range connections {
adsClient := AdsClient{
ConnectionID: c.conID,
ConnectedAt: c.connectedAt,
PeerAddress: c.peerAddr,
}
adsClients.Connected = append(adsClients.Connected, adsClient)
}
writeJSON(w, adsClients)
}
// adsz implements a status and debug interface for ADS.
// It is mapped to /debug/adsz
func (s *DiscoveryServer) adsz(w http.ResponseWriter, req *http.Request) {
if s.handlePushRequest(w, req) {
return
}
proxyID, con := s.getDebugConnection(req)
if proxyID != "" && con == nil {
// We can't guarantee the Pilot we are connected to has a connection to the proxy we requested
// There isn't a great way around this, but for debugging purposes its suitable to have the caller retry.
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte("Proxy not connected to this Pilot instance. It may be connected to another instance.\n"))
return
}
var connections []*Connection
if con != nil {
connections = []*Connection{con}
} else {
connections = s.Clients()
}
adsClients := &AdsClients{}
adsClients.Total = len(connections)
for _, c := range connections {
adsClient := AdsClient{
ConnectionID: c.conID,
ConnectedAt: c.connectedAt,
PeerAddress: c.peerAddr,
Metadata: c.proxy.Metadata,
Watches: map[string][]string{},
}
c.proxy.RLock()
for k, wr := range c.proxy.WatchedResources {
r := wr.ResourceNames
if r == nil {
r = []string{}
}
adsClient.Watches[k] = r
}
c.proxy.RUnlock()
adsClients.Connected = append(adsClients.Connected, adsClient)
}
sort.Slice(adsClients.Connected, func(i, j int) bool {
return adsClients.Connected[i].ConnectionID < adsClients.Connected[j].ConnectionID
})
writeJSON(w, adsClients)
}
// ecdsz implements a status and debug interface for ECDS.
// It is mapped to /debug/ecdsz
func (s *DiscoveryServer) ecdsz(w http.ResponseWriter, req *http.Request) {
if s.handlePushRequest(w, req) {
return
}
proxyID, con := s.getDebugConnection(req)
if con == nil {
s.errorHandler(w, proxyID, con)
return
}
if s.Generators[v3.ExtensionConfigurationType] != nil {
r, ok := con.proxy.WatchedResources[v3.ExtensionConfigurationType]
if !ok {
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(fmt.Sprintf("no watched ExtensionConfigurationType found, proxyID: %s\n", proxyID)))
return
}
resource, _, _ := s.Generators[v3.ExtensionConfigurationType].Generate(con.proxy, r, &model.PushRequest{
Full: true,
Push: con.proxy.LastPushContext,
})
if len(resource) == 0 {
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(fmt.Sprintf("ExtensionConfigurationType not found, proxyID: %s\n", proxyID)))
return
}
wasmCfgs := make([]interface{}, 0, len(resource))
for _, rr := range resource {
if w, err := unmarshalToWasm(rr); err != nil {
istiolog.Warnf("failed to unmarshal wasm: %v", err)
} else {
wasmCfgs = append(wasmCfgs, w)
}
}
writeJSON(w, wasmCfgs)
}
}
func unmarshalToWasm(r *discovery.Resource) (interface{}, error) {
tce := &core.TypedExtensionConfig{}
if err := r.GetResource().UnmarshalTo(tce); err != nil {
return nil, err
}
switch tce.TypedConfig.TypeUrl {
case xds.WasmHTTPFilterType:
w := &wasm.Wasm{}
if err := tce.TypedConfig.UnmarshalTo(w); err != nil {
return nil, err
}
// Redact Wasm secret env variable.
vmenvs := w.GetConfig().GetVmConfig().EnvironmentVariables
if vmenvs != nil {
if _, found := vmenvs.KeyValues[model.WasmSecretEnv]; found {
vmenvs.KeyValues[model.WasmSecretEnv] = "<Redacted>"
}
}
return w, nil
}
return tce, nil
}
// ConfigDump returns information in the form of the Envoy admin API config dump for the specified proxy
// The dump will only contain dynamic listeners/clusters/routes and can be used to compare what an Envoy instance
// should look like according to Pilot vs what it currently does look like.
func (s *DiscoveryServer) ConfigDump(w http.ResponseWriter, req *http.Request) {
proxyID, con := s.getDebugConnection(req)
if con == nil {
s.errorHandler(w, proxyID, con)
return
}
dump, err := s.configDump(con)
if err != nil {
handleHTTPError(w, err)
return
}
writeJSON(w, dump)
}
// configDump converts the connection internal state into an Envoy Admin API config dump proto
// It is used in debugging to create a consistent object for comparison between Envoy and Pilot outputs
func (s *DiscoveryServer) configDump(conn *Connection) (*adminapi.ConfigDump, error) {
dynamicActiveClusters := make([]*adminapi.ClustersConfigDump_DynamicCluster, 0)
req := &model.PushRequest{Push: conn.proxy.LastPushContext, Start: time.Now()}
clusters, _ := s.ConfigGenerator.BuildClusters(conn.proxy, req)
for _, cs := range clusters {
dynamicActiveClusters = append(dynamicActiveClusters, &adminapi.ClustersConfigDump_DynamicCluster{Cluster: cs.Resource})
}
clustersAny, err := util.MessageToAnyWithError(&adminapi.ClustersConfigDump{
VersionInfo: versionInfo(),
DynamicActiveClusters: dynamicActiveClusters,
})
if err != nil {
return nil, err
}
dynamicActiveListeners := make([]*adminapi.ListenersConfigDump_DynamicListener, 0)
listeners := s.ConfigGenerator.BuildListeners(conn.proxy, req.Push)
for _, cs := range listeners {
listener, err := any.New(cs)
if err != nil {
return nil, err
}
dynamicActiveListeners = append(dynamicActiveListeners, &adminapi.ListenersConfigDump_DynamicListener{
Name: cs.Name,
ActiveState: &adminapi.ListenersConfigDump_DynamicListenerState{Listener: listener},
})
}
listenersAny, err := util.MessageToAnyWithError(&adminapi.ListenersConfigDump{
VersionInfo: versionInfo(),
DynamicListeners: dynamicActiveListeners,
})
if err != nil {
return nil, err
}
routes, _ := s.ConfigGenerator.BuildHTTPRoutes(conn.proxy, req, conn.Routes())
routeConfigAny := util.MessageToAny(&adminapi.RoutesConfigDump{})
if len(routes) > 0 {
dynamicRouteConfig := make([]*adminapi.RoutesConfigDump_DynamicRouteConfig, 0)
for _, rs := range routes {
dynamicRouteConfig = append(dynamicRouteConfig, &adminapi.RoutesConfigDump_DynamicRouteConfig{RouteConfig: rs.Resource})
}
routeConfigAny, err = util.MessageToAnyWithError(&adminapi.RoutesConfigDump{DynamicRouteConfigs: dynamicRouteConfig})
if err != nil {
return nil, err
}
}
secretsDump := &adminapi.SecretsConfigDump{}
if s.Generators[v3.SecretType] != nil {
secrets, _, _ := s.Generators[v3.SecretType].Generate(conn.proxy, conn.Watched(v3.SecretType), nil)
if len(secrets) > 0 {
for _, secretAny := range secrets {
secret := &tls.Secret{}
if err := secretAny.GetResource().UnmarshalTo(secret); err != nil {
istiolog.Warnf("failed to unmarshal secret: %v", err)
}
if secret.GetTlsCertificate() != nil {
secret.GetTlsCertificate().PrivateKey = &core.DataSource{
Specifier: &core.DataSource_InlineBytes{
InlineBytes: []byte("[redacted]"),
},
}
}
secretsDump.DynamicActiveSecrets = append(secretsDump.DynamicActiveSecrets, &adminapi.SecretsConfigDump_DynamicSecret{
Name: secret.Name,
Secret: util.MessageToAny(secret),
})
}
}
}
bootstrapAny := util.MessageToAny(&adminapi.BootstrapConfigDump{})
scopedRoutesAny := util.MessageToAny(&adminapi.ScopedRoutesConfigDump{})
// The config dump must have all configs with connections specified in
// https://www.envoyproxy.io/docs/envoy/latest/api-v2/admin/v2alpha/config_dump.proto
configDump := &adminapi.ConfigDump{
Configs: []*any.Any{
bootstrapAny,
clustersAny, listenersAny,
scopedRoutesAny,
routeConfigAny,
util.MessageToAny(secretsDump),
},
}
return configDump, nil
}
// injectTemplateHandler dumps the injection template
// Replaces dumping the template at startup.
func (s *DiscoveryServer) injectTemplateHandler(webhook func() map[string]string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
// TODO: we should split the inject template into smaller modules (separate one for dump core, etc),
// and allow pods to select which patches will be selected. When this happen, this should return
// all inject templates or take a param to select one.
if webhook == nil {
w.WriteHeader(http.StatusNotFound)
return
}
writeJSON(w, webhook())
}
}
// meshHandler dumps the mesh config
func (s *DiscoveryServer) meshHandler(w http.ResponseWriter, r *http.Request) {
writeJSON(w, s.Env.Mesh())
}
// pushStatusHandler dumps the last PushContext
func (s *DiscoveryServer) pushStatusHandler(w http.ResponseWriter, req *http.Request) {
model.LastPushMutex.Lock()
defer model.LastPushMutex.Unlock()
if model.LastPushStatus == nil {
return
}
out, err := model.LastPushStatus.StatusJSON()
if err != nil {
handleHTTPError(w, err)
return
}
w.Header().Add("Content-Type", "application/json")
_, _ = w.Write(out)
}
// PushContextDebug holds debug information for push context.
type PushContextDebug struct {
AuthorizationPolicies *model.AuthorizationPolicies
NetworkGateways map[network.ID][]model.NetworkGateway
}
// pushContextHandler dumps the current PushContext
func (s *DiscoveryServer) pushContextHandler(w http.ResponseWriter, _ *http.Request) {
push := PushContextDebug{
AuthorizationPolicies: s.globalPushContext().AuthzPolicies,
NetworkGateways: s.globalPushContext().NetworkManager().GatewaysByNetwork(),
}
writeJSON(w, push)
}
// Debug lists all the supported debug endpoints.
func (s *DiscoveryServer) Debug(w http.ResponseWriter, req *http.Request) {
type debugEndpoint struct {
Name string
Href string
Help string
}
var deps []debugEndpoint
for k, v := range s.debugHandlers {
deps = append(deps, debugEndpoint{
Name: k,
Href: k,
Help: v,
})
}
sort.Slice(deps, func(i, j int) bool {
return deps[i].Name < deps[j].Name
})
if err := indexTmpl.Execute(w, deps); err != nil {
istiolog.Errorf("Error in rendering index template %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
}
// List all the supported debug commands in json.
func (s *DiscoveryServer) List(w http.ResponseWriter, req *http.Request) {
var cmdNames []string
for k := range s.debugHandlers {
key := strings.Replace(k, "/debug/", "", -1)
// exclude current list command
if key == "list" {
continue
}
// can not support pprof commands
if strings.Contains(key, "pprof") {
continue
}
cmdNames = append(cmdNames, key)
}
sort.Strings(cmdNames)
writeJSON(w, cmdNames)
}
// ndsz implements a status and debug interface for NDS.
// It is mapped to /debug/ndsz on the monitor port (15014).
func (s *DiscoveryServer) ndsz(w http.ResponseWriter, req *http.Request) {
if s.handlePushRequest(w, req) {
return
}
proxyID, con := s.getDebugConnection(req)
if con == nil {
s.errorHandler(w, proxyID, con)
return
}
if !con.proxy.Metadata.DNSCapture {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("DNS capture is not enabled in the proxy\n"))
return
}
if s.Generators[v3.NameTableType] != nil {
nds, _, _ := s.Generators[v3.NameTableType].Generate(con.proxy, nil, &model.PushRequest{
Push: con.proxy.LastPushContext,
})
if len(nds) == 0 {
return
}
writeJSON(w, nds[0])
}
}
// Edsz implements a status and debug interface for EDS.
// It is mapped to /debug/edsz on the monitor port (15014).
func (s *DiscoveryServer) Edsz(w http.ResponseWriter, req *http.Request) {
if s.handlePushRequest(w, req) {
return
}
proxyID, con := s.getDebugConnection(req)
if con == nil {
s.errorHandler(w, proxyID, con)
return
}
clusters := con.Clusters()
eps := make([]jsonMarshalProto, 0, len(clusters))
for _, clusterName := range clusters {
eps = append(eps, jsonMarshalProto{s.generateEndpoints(NewEndpointBuilder(clusterName, con.proxy, con.proxy.LastPushContext))})
}
writeJSON(w, eps)
}
func (s *DiscoveryServer) forceDisconnect(w http.ResponseWriter, req *http.Request) {
proxyID, con := s.getDebugConnection(req)
if con == nil {
s.errorHandler(w, proxyID, con)
return
}
con.Stop()
_, _ = w.Write([]byte("OK"))
}
func (s *DiscoveryServer) getProxyConnection(proxyID string) *Connection {
for _, con := range s.Clients() {
if strings.Contains(con.conID, proxyID) {
return con
}
}
return nil
}
func (s *DiscoveryServer) instancesz(w http.ResponseWriter, req *http.Request) {
instances := map[string][]*model.ServiceInstance{}
for _, con := range s.Clients() {
con.proxy.RLock()
if con.proxy != nil {
instances[con.proxy.ID] = con.proxy.ServiceInstances
}
con.proxy.RUnlock()
}
writeJSON(w, instances)
}
func (s *DiscoveryServer) networkz(w http.ResponseWriter, _ *http.Request) {
writeJSON(w, s.Env.NetworkManager.AllGateways())
}
func (s *DiscoveryServer) mcsz(w http.ResponseWriter, _ *http.Request) {
svcs := sortMCSServices(s.Env.MCSServices())
writeJSON(w, svcs)
}
func sortMCSServices(svcs []model.MCSServiceInfo) []model.MCSServiceInfo {
sort.Slice(svcs, func(i, j int) bool {
if strings.Compare(svcs[i].Cluster.String(), svcs[j].Cluster.String()) < 0 {
return true
}
if strings.Compare(svcs[i].Namespace, svcs[j].Namespace) < 0 {
return true
}
return strings.Compare(svcs[i].Name, svcs[j].Name) < 0
})
return svcs
}
func (s *DiscoveryServer) clusterz(w http.ResponseWriter, _ *http.Request) {
if s.ListRemoteClusters == nil {
w.WriteHeader(400)
return
}
writeJSON(w, s.ListRemoteClusters())
}
// handlePushRequest handles a ?push=true query param and triggers a push.
// A boolean response is returned to indicate if the caller should continue
func (s *DiscoveryServer) handlePushRequest(w http.ResponseWriter, req *http.Request) bool {
if err := req.ParseForm(); err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("Failed to parse request\n"))
return true
}
if req.Form.Get("push") != "" {
AdsPushAll(s)
_, _ = fmt.Fprintf(w, "Pushed to %d servers\n", s.adsClientCount())
return true
}
return false
}
// getDebugConnection fetches the Connection requested by proxyID
func (s *DiscoveryServer) getDebugConnection(req *http.Request) (string, *Connection) {
if proxyID := req.URL.Query().Get("proxyID"); proxyID != "" {
return proxyID, s.getProxyConnection(proxyID)
}
return "", nil
}
func (s *DiscoveryServer) errorHandler(w http.ResponseWriter, proxyID string, con *Connection) {
if proxyID == "" {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("You must provide a proxyID in the query string\n"))
return
}
if con == nil {
// We can't guarantee the Pilot we are connected to has a connection to the proxy we requested
// There isn't a great way around this, but for debugging purposes its suitable to have the caller retry.
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte("Proxy not connected to this Pilot instance. It may be connected to another instance.\n"))
return
}
}
// jsonMarshalProto wraps a proto.Message so it can be marshaled with the standard encoding/json library
type jsonMarshalProto struct {
proto.Message
}
func (p jsonMarshalProto) MarshalJSON() ([]byte, error) {
return protomarshal.Marshal(p.Message)
}
// writeJSON writes a json payload, handling content type, marshaling, and errors
func writeJSON(w http.ResponseWriter, obj interface{}) {
w.Header().Set("Content-Type", "application/json")
b, err := config.ToJSON(obj)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
_, err = w.Write(b)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
}
// handleHTTPError writes an error message to the response
func handleHTTPError(w http.ResponseWriter, err error) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
}