blob: df18844b212d42af129524f2587a8f015cd50ae5 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"sort"
"strings"
"sync"
"time"
)
import (
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
httpwasm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/wasm/v3"
hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
wasmfilter "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/wasm/v3"
wasm "github.com/envoyproxy/go-control-plane/envoy/extensions/wasm/v3"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
wrappers "google.golang.org/protobuf/types/known/wrapperspb"
sd "istio.io/api/envoy/extensions/stackdriver/config/v1alpha1"
"istio.io/api/envoy/extensions/stats"
meshconfig "istio.io/api/mesh/v1alpha1"
tpb "istio.io/api/telemetry/v1alpha1"
istiolog "istio.io/pkg/log"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking"
"github.com/apache/dubbo-go-pixiu/pkg/config/labels"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/collections"
"github.com/apache/dubbo-go-pixiu/pkg/config/xds"
"github.com/apache/dubbo-go-pixiu/pkg/util/protomarshal"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
)
var telemetryLog = istiolog.RegisterScope("telemetry", "Istio Telemetry", 0)
// Telemetry holds configuration for Telemetry API resources.
type Telemetry struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
Spec *tpb.Telemetry `json:"spec"`
}
// Telemetries organizes Telemetry configuration by namespace.
type Telemetries struct {
// Maps from namespace to the Telemetry configs.
NamespaceToTelemetries map[string][]Telemetry `json:"namespace_to_telemetries"`
// The name of the root namespace.
RootNamespace string `json:"root_namespace"`
// Computed meshConfig
meshConfig *meshconfig.MeshConfig
// computedMetricsFilters contains the set of cached HCM/listener filters for the metrics portion.
// These filters are extremely costly, as we insert them into every listener on every proxy, and to
// generate them we need to merge many telemetry specs and perform 2 Any marshals.
// To improve performance, we store a cache based on the Telemetries that impacted the filter, as well as
// its class and protocol. This is protected by mu.
// Currently, this only applies to metrics, but a similar concept can likely be applied to logging and
// tracing for performance.
// The computedMetricsFilters lifetime is bound to the Telemetries object. During a push context
// creation, we will preserve the Telemetries (and thus the cache) if not Telemetries are modified.
// As result, this cache will live until any Telemetry is modified.
computedMetricsFilters map[metricsKey]interface{}
mu sync.Mutex
}
// telemetryKey defines a key into the computedMetricsFilters cache.
type telemetryKey struct {
// Root stores the Telemetry in the root namespace, if any
Root NamespacedName
// Namespace stores the Telemetry in the root namespace, if any
Namespace NamespacedName
// Workload stores the Telemetry in the root namespace, if any
Workload NamespacedName
}
// metricsKey defines a key into the computedMetricsFilters cache.
type metricsKey struct {
telemetryKey
Class networking.ListenerClass
Protocol networking.ListenerProtocol
}
// getTelemetries returns the Telemetry configurations for the given environment.
func getTelemetries(env *Environment) (*Telemetries, error) {
telemetries := &Telemetries{
NamespaceToTelemetries: map[string][]Telemetry{},
RootNamespace: env.Mesh().GetRootNamespace(),
meshConfig: env.Mesh(),
computedMetricsFilters: map[metricsKey]interface{}{},
}
fromEnv, err := env.List(collections.IstioTelemetryV1Alpha1Telemetries.Resource().GroupVersionKind(), NamespaceAll)
if err != nil {
return nil, err
}
sortConfigByCreationTime(fromEnv)
for _, config := range fromEnv {
telemetry := Telemetry{
Name: config.Name,
Namespace: config.Namespace,
Spec: config.Spec.(*tpb.Telemetry),
}
telemetries.NamespaceToTelemetries[config.Namespace] = append(telemetries.NamespaceToTelemetries[config.Namespace], telemetry)
}
return telemetries, nil
}
type metricsConfig struct {
ClientMetrics []metricsOverride
ServerMetrics []metricsOverride
}
type telemetryFilterConfig struct {
metricsConfig
Provider *meshconfig.MeshConfig_ExtensionProvider
Metrics bool
AccessLogging bool
LogsFilter *tpb.AccessLogging_Filter
}
func (t telemetryFilterConfig) MetricsForClass(c networking.ListenerClass) []metricsOverride {
switch c {
case networking.ListenerClassGateway:
return t.ClientMetrics
case networking.ListenerClassSidecarInbound:
return t.ServerMetrics
case networking.ListenerClassSidecarOutbound:
return t.ClientMetrics
default:
return t.ClientMetrics
}
}
type metricsOverride struct {
Name string
Disabled bool
Tags []tagOverride
}
type tagOverride struct {
Name string
Remove bool
Value string
}
// computedTelemetries contains the various Telemetry configurations in scope for a given proxy.
// This can include the root namespace, namespace, and workload Telemetries combined
type computedTelemetries struct {
telemetryKey
Metrics []*tpb.Metrics
Logging []*tpb.AccessLogging
Tracing []*tpb.Tracing
}
type TracingConfig struct {
ServerSpec TracingSpec
ClientSpec TracingSpec
}
type TracingSpec struct {
Provider *meshconfig.MeshConfig_ExtensionProvider
Disabled bool
RandomSamplingPercentage float64
CustomTags map[string]*tpb.Tracing_CustomTag
UseRequestIDForTraceSampling bool
}
type LoggingConfig struct {
Providers []*meshconfig.MeshConfig_ExtensionProvider
Filter *tpb.AccessLogging_Filter
}
func workloadMode(class networking.ListenerClass) tpb.WorkloadMode {
switch class {
case networking.ListenerClassGateway:
return tpb.WorkloadMode_CLIENT
case networking.ListenerClassSidecarInbound:
return tpb.WorkloadMode_SERVER
case networking.ListenerClassSidecarOutbound:
return tpb.WorkloadMode_CLIENT
case networking.ListenerClassUndefined:
// this should not happened, just in case
return tpb.WorkloadMode_CLIENT
}
return tpb.WorkloadMode_CLIENT
}
// AccessLogging returns the logging configuration for a given proxy and listener class.
// If nil is returned, access logs are not configured via Telemetry and should use fallback mechanisms.
// If a non-nil but empty configuration is passed, access logging is explicitly disabled.
func (t *Telemetries) AccessLogging(proxy *Proxy, class networking.ListenerClass) *LoggingConfig {
ct := t.applicableTelemetries(proxy)
if len(ct.Logging) == 0 && len(t.meshConfig.GetDefaultProviders().GetAccessLogging()) == 0 {
return nil
}
cfg := LoggingConfig{}
providers, f := mergeLogs(ct.Logging, t.meshConfig, workloadMode(class))
cfg.Filter = f
for _, p := range providers.SortedList() {
fp := t.fetchProvider(p)
if fp != nil {
cfg.Providers = append(cfg.Providers, fp)
}
}
return &cfg
}
// Tracing returns the logging tracing for a given proxy. If nil is returned, tracing
// are not configured via Telemetry and should use fallback mechanisms. If a non-nil but disabled is set,
// then tracing is explicitly disabled
func (t *Telemetries) Tracing(proxy *Proxy) *TracingConfig {
ct := t.applicableTelemetries(proxy)
providerNames := t.meshConfig.GetDefaultProviders().GetTracing()
hasDefaultProvider := len(providerNames) > 0
if len(ct.Tracing) == 0 && !hasDefaultProvider {
return nil
}
clientSpec := TracingSpec{UseRequestIDForTraceSampling: true}
serverSpec := TracingSpec{UseRequestIDForTraceSampling: true}
if hasDefaultProvider {
// todo: what do we want to do with more than one default provider?
// for now, use only the first provider.
fetched := t.fetchProvider(providerNames[0])
clientSpec.Provider = fetched
serverSpec.Provider = fetched
}
for _, m := range ct.Tracing {
names := getProviderNames(m.Providers)
specs := []*TracingSpec{&clientSpec, &serverSpec}
if m.Match != nil {
switch m.Match.Mode {
case tpb.WorkloadMode_CLIENT:
specs = []*TracingSpec{&clientSpec}
case tpb.WorkloadMode_SERVER:
specs = []*TracingSpec{&serverSpec}
}
}
if len(names) > 0 {
// NOTE: we only support a single provider per mode
// so, choosing the first provider returned in the list
// is the "safest"
fetched := t.fetchProvider(names[0])
for _, spec := range specs {
spec.Provider = fetched
}
}
// Now merge in any overrides
if m.DisableSpanReporting != nil {
for _, spec := range specs {
spec.Disabled = m.DisableSpanReporting.GetValue()
}
}
// TODO: metrics overrides do a deep merge, but here we do a shallow merge.
// We should consider if we want to reconcile the two.
if m.CustomTags != nil {
for _, spec := range specs {
spec.CustomTags = m.CustomTags
}
}
if m.RandomSamplingPercentage != nil {
for _, spec := range specs {
spec.RandomSamplingPercentage = m.RandomSamplingPercentage.GetValue()
}
}
if m.UseRequestIdForTraceSampling != nil {
for _, spec := range specs {
spec.UseRequestIDForTraceSampling = m.UseRequestIdForTraceSampling.Value
}
}
}
// If no provider is configured (and retrieved) for the tracing specs,
// then we will disable the configuration.
if clientSpec.Provider == nil {
clientSpec.Disabled = true
}
if serverSpec.Provider == nil {
serverSpec.Disabled = true
}
cfg := TracingConfig{
ClientSpec: clientSpec,
ServerSpec: serverSpec,
}
return &cfg
}
// HTTPFilters computes the HttpFilter for a given proxy/class
func (t *Telemetries) HTTPFilters(proxy *Proxy, class networking.ListenerClass) []*hcm.HttpFilter {
if res := t.telemetryFilters(proxy, class, networking.ListenerProtocolHTTP); res != nil {
return res.([]*hcm.HttpFilter)
}
return nil
}
// TCPFilters computes the TCPFilters for a given proxy/class
func (t *Telemetries) TCPFilters(proxy *Proxy, class networking.ListenerClass) []*listener.Filter {
if res := t.telemetryFilters(proxy, class, networking.ListenerProtocolTCP); res != nil {
return res.([]*listener.Filter)
}
return nil
}
// applicableTelemetries fetches the relevant telemetry configurations for a given proxy
func (t *Telemetries) applicableTelemetries(proxy *Proxy) computedTelemetries {
if t == nil {
return computedTelemetries{}
}
namespace := proxy.ConfigNamespace
// Order here matters. The latter elements will override the first elements
ms := []*tpb.Metrics{}
ls := []*tpb.AccessLogging{}
ts := []*tpb.Tracing{}
key := telemetryKey{}
if t.RootNamespace != "" {
telemetry := t.namespaceWideTelemetryConfig(t.RootNamespace)
if telemetry != (Telemetry{}) {
key.Root = NamespacedName{Name: telemetry.Name, Namespace: telemetry.Namespace}
ms = append(ms, telemetry.Spec.GetMetrics()...)
ls = append(ls, telemetry.Spec.GetAccessLogging()...)
ts = append(ts, telemetry.Spec.GetTracing()...)
}
}
if namespace != t.RootNamespace {
telemetry := t.namespaceWideTelemetryConfig(namespace)
if telemetry != (Telemetry{}) {
key.Namespace = NamespacedName{Name: telemetry.Name, Namespace: telemetry.Namespace}
ms = append(ms, telemetry.Spec.GetMetrics()...)
ls = append(ls, telemetry.Spec.GetAccessLogging()...)
ts = append(ts, telemetry.Spec.GetTracing()...)
}
}
for _, telemetry := range t.NamespaceToTelemetries[namespace] {
spec := telemetry.Spec
if len(spec.GetSelector().GetMatchLabels()) == 0 {
continue
}
selector := labels.Instance(spec.GetSelector().GetMatchLabels())
if selector.SubsetOf(proxy.Metadata.Labels) {
key.Workload = NamespacedName{Name: telemetry.Name, Namespace: telemetry.Namespace}
ms = append(ms, spec.GetMetrics()...)
ls = append(ls, spec.GetAccessLogging()...)
ts = append(ts, spec.GetTracing()...)
break
}
}
return computedTelemetries{
telemetryKey: key,
Metrics: ms,
Logging: ls,
Tracing: ts,
}
}
// telemetryFilters computes the filters for the given proxy/class and protocol. This computes the
// set of applicable Telemetries, merges them, then translates to the appropriate filters based on the
// extension providers in the mesh config. Where possible, the result is cached.
// Currently, this includes metrics and access logging, as some providers are implemented in filters.
func (t *Telemetries) telemetryFilters(proxy *Proxy, class networking.ListenerClass, protocol networking.ListenerProtocol) interface{} {
if t == nil {
return nil
}
c := t.applicableTelemetries(proxy)
key := metricsKey{
telemetryKey: c.telemetryKey,
Class: class,
Protocol: protocol,
}
t.mu.Lock()
defer t.mu.Unlock()
precomputed, f := t.computedMetricsFilters[key]
if f {
return precomputed
}
// First, take all the metrics configs and transform them into a normalized form
tmm := mergeMetrics(c.Metrics, t.meshConfig)
log.Debugf("merged metrics, proxyID: %s metrics: %+v", proxy.ID, tmm)
// Additionally, fetch relevant access logging configurations
tml, logsFilter := mergeLogs(c.Logging, t.meshConfig, workloadMode(class))
// The above result is in a nested map to deduplicate responses. This loses ordering, so we convert to
// a list to retain stable naming
allKeys := sets.New(tml.UnsortedList()...)
for k := range tmm {
allKeys.Insert(k)
}
m := make([]telemetryFilterConfig, 0, len(allKeys))
for _, k := range allKeys.SortedList() {
p := t.fetchProvider(k)
if p == nil {
continue
}
_, logging := tml[k]
_, metrics := tmm[k]
cfg := telemetryFilterConfig{
Provider: p,
metricsConfig: tmm[k],
AccessLogging: logging,
Metrics: metrics,
LogsFilter: logsFilter,
}
m = append(m, cfg)
}
var res interface{}
// Finally, compute the actual filters based on the protoc
switch protocol {
case networking.ListenerProtocolHTTP:
res = buildHTTPTelemetryFilter(class, m)
default:
res = buildTCPTelemetryFilter(class, m)
}
// Update cache
t.computedMetricsFilters[key] = res
return res
}
// mergeLogs returns the set of providers for the given logging configuration.
func mergeLogs(logs []*tpb.AccessLogging, mesh *meshconfig.MeshConfig, mode tpb.WorkloadMode) (sets.Set, *tpb.AccessLogging_Filter) {
providers := sets.New()
if len(logs) == 0 {
for _, dp := range mesh.GetDefaultProviders().GetAccessLogging() {
// Insert the default provider.
providers.Insert(dp)
}
return providers, nil
}
var loggingFilter *tpb.AccessLogging_Filter
providerNames := mesh.GetDefaultProviders().GetAccessLogging()
for _, m := range logs {
names := getProviderNames(m.Providers)
if len(names) > 0 {
providerNames = names
}
if m.Filter != nil {
loggingFilter = m.Filter
}
}
inScopeProviders := sets.New(providerNames...)
parentProviders := mesh.GetDefaultProviders().GetAccessLogging()
for _, m := range logs {
providerNames := getProviderNames(m.Providers)
if len(providerNames) == 0 {
providerNames = parentProviders
}
parentProviders = providerNames
for _, provider := range providerNames {
if !inScopeProviders.Contains(provider) {
// We don't care about this, remove it
// This occurs when a top level provider is later disabled by a lower level
continue
}
if !matchWorkloadMode(m.Match, mode) {
continue
}
if m.GetDisabled().GetValue() {
providers.Delete(provider)
continue
}
providers.Insert(provider)
}
}
return providers, loggingFilter
}
func matchWorkloadMode(selector *tpb.AccessLogging_LogSelector, mode tpb.WorkloadMode) bool {
if selector == nil {
return true
}
if selector.Mode == tpb.WorkloadMode_CLIENT_AND_SERVER {
return true
}
return selector.Mode == mode
}
func (t *Telemetries) namespaceWideTelemetryConfig(namespace string) Telemetry {
for _, tel := range t.NamespaceToTelemetries[namespace] {
if len(tel.Spec.GetSelector().GetMatchLabels()) == 0 {
return tel
}
}
return Telemetry{}
}
// fetchProvider finds the matching ExtensionProviders from the mesh config
func (t *Telemetries) fetchProvider(m string) *meshconfig.MeshConfig_ExtensionProvider {
for _, p := range t.meshConfig.ExtensionProviders {
if strings.EqualFold(m, p.Name) {
return p
}
}
return nil
}
var allMetrics = func() []string {
r := make([]string, 0, len(tpb.MetricSelector_IstioMetric_value))
for k := range tpb.MetricSelector_IstioMetric_value {
if k != tpb.MetricSelector_IstioMetric_name[int32(tpb.MetricSelector_ALL_METRICS)] {
r = append(r, k)
}
}
sort.Strings(r)
return r
}()
// mergeMetrics merges many Metrics objects into a normalized configuration
func mergeMetrics(metrics []*tpb.Metrics, mesh *meshconfig.MeshConfig) map[string]metricsConfig {
type metricOverride struct {
Disabled *wrappers.BoolValue
TagOverrides map[string]*tpb.MetricsOverrides_TagOverride
}
// provider -> mode -> metric -> overrides
providers := map[string]map[tpb.WorkloadMode]map[string]metricOverride{}
if len(metrics) == 0 {
for _, dp := range mesh.GetDefaultProviders().GetMetrics() {
// Insert the default provider. It has no overrides; presence of the key is sufficient to
// get the filter created.
providers[dp] = map[tpb.WorkloadMode]map[string]metricOverride{}
}
}
providerNames := mesh.GetDefaultProviders().GetMetrics()
for _, m := range metrics {
names := getProviderNames(m.Providers)
// If providers is set, it overrides the parent. If not, inherent from the parent. It is not a deep merge.
if len(names) > 0 {
providerNames = names
}
}
// Record the names of all providers we should configure. Anything else we will ignore
inScopeProviders := sets.New(providerNames...)
parentProviders := mesh.GetDefaultProviders().GetMetrics()
disabledAllMetricsProviders := sets.New()
for _, m := range metrics {
providerNames := getProviderNames(m.Providers)
// If providers is not set, use parent's
if len(providerNames) == 0 {
providerNames = parentProviders
}
parentProviders = providerNames
for _, provider := range providerNames {
if !inScopeProviders.Contains(provider) {
// We don't care about this, remove it
// This occurs when a top level provider is later disabled by a lower level
continue
}
if _, f := providers[provider]; !f {
providers[provider] = map[tpb.WorkloadMode]map[string]metricOverride{
tpb.WorkloadMode_CLIENT: {},
tpb.WorkloadMode_SERVER: {},
}
}
mp := providers[provider]
// For each override, we normalize the configuration. The metrics list is an ordered list - latter
// elements have precedence. As a result, we will apply updates on top of previous entries.
for _, o := range m.Overrides {
// if we disable all metrics, we should drop the entire filter
if isAllMetrics(o.GetMatch()) && o.Disabled.GetValue() {
disabledAllMetricsProviders.Insert(provider)
continue
}
// root namespace disables all, but then enables them by namespace scoped
disabledAllMetricsProviders.Delete(provider)
metricsNames := getMatches(o.GetMatch())
// If client or server is set explicitly, only apply there. Otherwise, we will apply to both.
// Note: client and server keys may end up the same, which is fine
for _, mode := range getModes(o.GetMatch().GetMode()) {
// Next, get all matches.
// This is a bit funky because the matches are oneof of ENUM and customer metric. We normalize
// these to strings, so we may end up with a list like [REQUEST_COUNT, my-customer-metric].
// TODO: we always flatten ALL_METRICS into each metric mode. For some stats providers (prometheus),
// we are able to apply overrides to all metrics directly rather than duplicating the config.
// We should tweak this to collapse to this mode where possible
for _, metricName := range metricsNames {
if _, f := mp[mode]; !f {
mp[mode] = map[string]metricOverride{}
}
override := mp[mode][metricName]
if o.Disabled != nil {
override.Disabled = o.Disabled
}
for k, v := range o.TagOverrides {
if override.TagOverrides == nil {
override.TagOverrides = map[string]*tpb.MetricsOverrides_TagOverride{}
}
override.TagOverrides[k] = v
}
mp[mode][metricName] = override
}
}
}
}
}
processed := map[string]metricsConfig{}
for provider, modeMap := range providers {
if disabledAllMetricsProviders.Contains(provider) {
continue
}
for mode, metricMap := range modeMap {
for metric, override := range metricMap {
tags := []tagOverride{}
for k, v := range override.TagOverrides {
o := tagOverride{Name: k}
switch v.Operation {
case tpb.MetricsOverrides_TagOverride_REMOVE:
o.Remove = true
o.Value = ""
case tpb.MetricsOverrides_TagOverride_UPSERT:
o.Value = v.GetValue()
o.Remove = false
}
tags = append(tags, o)
}
// Keep order deterministic
sort.Slice(tags, func(i, j int) bool {
return tags[i].Name < tags[j].Name
})
mo := metricsOverride{
Name: metric,
Disabled: override.Disabled.GetValue(),
Tags: tags,
}
tmm := processed[provider]
switch mode {
case tpb.WorkloadMode_CLIENT:
tmm.ClientMetrics = append(tmm.ClientMetrics, mo)
default:
tmm.ServerMetrics = append(tmm.ServerMetrics, mo)
}
processed[provider] = tmm
}
}
// Keep order deterministic
tmm := processed[provider]
sort.Slice(tmm.ServerMetrics, func(i, j int) bool {
return tmm.ServerMetrics[i].Name < tmm.ServerMetrics[j].Name
})
sort.Slice(tmm.ClientMetrics, func(i, j int) bool {
return tmm.ClientMetrics[i].Name < tmm.ClientMetrics[j].Name
})
processed[provider] = tmm
}
return processed
}
func getProviderNames(providers []*tpb.ProviderRef) []string {
res := make([]string, 0, len(providers))
for _, p := range providers {
res = append(res, p.GetName())
}
return res
}
func getModes(mode tpb.WorkloadMode) []tpb.WorkloadMode {
switch mode {
case tpb.WorkloadMode_CLIENT, tpb.WorkloadMode_SERVER:
return []tpb.WorkloadMode{mode}
default:
return []tpb.WorkloadMode{tpb.WorkloadMode_CLIENT, tpb.WorkloadMode_SERVER}
}
}
func isAllMetrics(match *tpb.MetricSelector) bool {
switch m := match.GetMetricMatch().(type) {
case *tpb.MetricSelector_CustomMetric:
return false
case *tpb.MetricSelector_Metric:
return m.Metric == tpb.MetricSelector_ALL_METRICS
default:
return false
}
}
func getMatches(match *tpb.MetricSelector) []string {
switch m := match.GetMetricMatch().(type) {
case *tpb.MetricSelector_CustomMetric:
return []string{m.CustomMetric}
case *tpb.MetricSelector_Metric:
if m.Metric == tpb.MetricSelector_ALL_METRICS {
return allMetrics
}
return []string{m.Metric.String()}
default:
return allMetrics
}
}
func statsRootIDForClass(class networking.ListenerClass) string {
switch class {
case networking.ListenerClassSidecarInbound:
return "stats_inbound"
default:
return "stats_outbound"
}
}
func buildHTTPTelemetryFilter(class networking.ListenerClass, metricsCfg []telemetryFilterConfig) []*hcm.HttpFilter {
res := make([]*hcm.HttpFilter, 0, len(metricsCfg))
for _, cfg := range metricsCfg {
switch cfg.Provider.GetProvider().(type) {
case *meshconfig.MeshConfig_ExtensionProvider_Prometheus:
if !cfg.Metrics {
// No logging for prometheus
continue
}
statsCfg := generateStatsConfig(class, cfg)
vmConfig := ConstructVMConfig("/etc/istio/extensions/stats-filter.compiled.wasm", "envoy.wasm.stats")
root := statsRootIDForClass(class)
vmConfig.VmConfig.VmId = root
wasmConfig := &httpwasm.Wasm{
Config: &wasm.PluginConfig{
RootId: root,
Vm: vmConfig,
Configuration: statsCfg,
},
}
f := &hcm.HttpFilter{
Name: xds.StatsFilterName,
ConfigType: &hcm.HttpFilter_TypedConfig{TypedConfig: networking.MessageToAny(wasmConfig)},
}
res = append(res, f)
case *meshconfig.MeshConfig_ExtensionProvider_Stackdriver:
sdCfg := generateSDConfig(class, cfg)
vmConfig := ConstructVMConfig("", "envoy.wasm.null.stackdriver")
vmConfig.VmConfig.VmId = stackdriverVMID(class)
wasmConfig := &httpwasm.Wasm{
Config: &wasm.PluginConfig{
RootId: vmConfig.VmConfig.VmId,
Vm: vmConfig,
Configuration: sdCfg,
},
}
f := &hcm.HttpFilter{
Name: xds.StackdriverFilterName,
ConfigType: &hcm.HttpFilter_TypedConfig{TypedConfig: networking.MessageToAny(wasmConfig)},
}
res = append(res, f)
default:
// Only prometheus and SD supported currently
continue
}
}
return res
}
func buildTCPTelemetryFilter(class networking.ListenerClass, telemetryConfigs []telemetryFilterConfig) []*listener.Filter {
res := []*listener.Filter{}
for _, telemetryCfg := range telemetryConfigs {
switch telemetryCfg.Provider.GetProvider().(type) {
case *meshconfig.MeshConfig_ExtensionProvider_Prometheus:
cfg := generateStatsConfig(class, telemetryCfg)
vmConfig := ConstructVMConfig("/etc/istio/extensions/stats-filter.compiled.wasm", "envoy.wasm.stats")
root := statsRootIDForClass(class)
vmConfig.VmConfig.VmId = "tcp_" + root
wasmConfig := &wasmfilter.Wasm{
Config: &wasm.PluginConfig{
RootId: root,
Vm: vmConfig,
Configuration: cfg,
},
}
f := &listener.Filter{
Name: xds.StatsFilterName,
ConfigType: &listener.Filter_TypedConfig{TypedConfig: networking.MessageToAny(wasmConfig)},
}
res = append(res, f)
case *meshconfig.MeshConfig_ExtensionProvider_Stackdriver:
cfg := generateSDConfig(class, telemetryCfg)
vmConfig := ConstructVMConfig("", "envoy.wasm.null.stackdriver")
vmConfig.VmConfig.VmId = stackdriverVMID(class)
wasmConfig := &wasmfilter.Wasm{
Config: &wasm.PluginConfig{
RootId: vmConfig.VmConfig.VmId,
Vm: vmConfig,
Configuration: cfg,
},
}
f := &listener.Filter{
Name: xds.StackdriverFilterName,
ConfigType: &listener.Filter_TypedConfig{TypedConfig: networking.MessageToAny(wasmConfig)},
}
res = append(res, f)
default:
// Only prometheus and SD supported currently
continue
}
}
return res
}
func stackdriverVMID(class networking.ListenerClass) string {
switch class {
case networking.ListenerClassSidecarInbound:
return "stackdriver_inbound"
default:
return "stackdriver_outbound"
}
}
var metricToSDServerMetrics = map[string]string{
"REQUEST_COUNT": "server/request_count",
"REQUEST_DURATION": "server/response_latencies",
"REQUEST_SIZE": "server/request_bytes",
"RESPONSE_SIZE": "server/response_bytes",
"TCP_OPENED_CONNECTIONS": "server/connection_open_count",
"TCP_CLOSED_CONNECTIONS": "server/connection_close_count",
"TCP_SENT_BYTES": "server/sent_bytes_count",
"TCP_RECEIVED_BYTES": "server/received_bytes_count",
"GRPC_REQUEST_MESSAGES": "",
"GRPC_RESPONSE_MESSAGES": "",
}
var metricToSDClientMetrics = map[string]string{
"REQUEST_COUNT": "client/request_count",
"REQUEST_DURATION": "client/response_latencies",
"REQUEST_SIZE": "client/request_bytes",
"RESPONSE_SIZE": "client/response_bytes",
"TCP_OPENED_CONNECTIONS": "client/connection_open_count",
"TCP_CLOSED_CONNECTIONS": "client/connection_close_count",
"TCP_SENT_BYTES": "client/sent_bytes_count",
"TCP_RECEIVED_BYTES": "client/received_bytes_count",
"GRPC_REQUEST_MESSAGES": "",
"GRPC_RESPONSE_MESSAGES": "",
}
// used for CEL expressions in stackdriver serialization
var jsonUnescaper = strings.NewReplacer(`\u003e`, `>`, `\u003c`, `<`, `\u0026`, `&`)
func generateSDConfig(class networking.ListenerClass, telemetryConfig telemetryFilterConfig) *anypb.Any {
cfg := sd.PluginConfig{
DisableHostHeaderFallback: disableHostHeaderFallback(class),
}
metricNameMap := metricToSDClientMetrics
if class == networking.ListenerClassSidecarInbound {
metricNameMap = metricToSDServerMetrics
}
for _, override := range telemetryConfig.MetricsForClass(class) {
metricName, f := metricNameMap[override.Name]
if !f {
// Not a predefined metric, must be a custom one
metricName = override.Name
}
if metricName == "" {
continue
}
if cfg.MetricsOverrides == nil {
cfg.MetricsOverrides = map[string]*sd.MetricsOverride{}
}
if _, f := cfg.MetricsOverrides[metricName]; !f {
cfg.MetricsOverrides[metricName] = &sd.MetricsOverride{}
}
cfg.MetricsOverrides[metricName].Drop = override.Disabled
for _, t := range override.Tags {
if t.Remove {
// Remove is not supported by SD
continue
}
if cfg.MetricsOverrides[metricName].TagOverrides == nil {
cfg.MetricsOverrides[metricName].TagOverrides = map[string]string{}
}
cfg.MetricsOverrides[metricName].TagOverrides[t.Name] = t.Value
}
}
if telemetryConfig.AccessLogging {
if telemetryConfig.LogsFilter != nil {
cfg.AccessLoggingFilterExpression = telemetryConfig.LogsFilter.Expression
} else {
if class == networking.ListenerClassSidecarInbound {
cfg.AccessLogging = sd.PluginConfig_FULL
} else {
// this can be achieved via CEL: `response.code >= 400 || response.code == 0`
cfg.AccessLogging = sd.PluginConfig_ERRORS_ONLY
}
}
} else {
// The field is deprecated, but until it is removed we need to set it.
cfg.DisableServerAccessLogging = true // nolint: staticcheck
}
cfg.MetricExpiryDuration = durationpb.New(1 * time.Hour)
// In WASM we are not actually processing protobuf at all, so we need to encode this to JSON
cfgJSON, _ := protomarshal.MarshalProtoNames(&cfg)
// MarshalProtoNames() forces HTML-escaped JSON encoding.
// this can be problematic for CEL expressions, particularly those using
// '>', '<', and '&'s. It is easier to use replaceAll operations than it is
// to mimic MarshalProtoNames() with configured JSON Encoder.
pb := &wrappers.StringValue{Value: jsonUnescaper.Replace(string(cfgJSON))}
return networking.MessageToAny(pb)
}
var metricToPrometheusMetric = map[string]string{
"REQUEST_COUNT": "requests_total",
"REQUEST_DURATION": "request_duration_milliseconds",
"REQUEST_SIZE": "request_bytes",
"RESPONSE_SIZE": "response_bytes",
"TCP_OPENED_CONNECTIONS": "tcp_connections_opened_total",
"TCP_CLOSED_CONNECTIONS": "tcp_connections_closed_total",
"TCP_SENT_BYTES": "tcp_sent_bytes_total",
"TCP_RECEIVED_BYTES": "tcp_received_bytes_total",
"GRPC_REQUEST_MESSAGES": "request_messages_total",
"GRPC_RESPONSE_MESSAGES": "response_messages_total",
}
func generateStatsConfig(class networking.ListenerClass, metricsCfg telemetryFilterConfig) *anypb.Any {
cfg := stats.PluginConfig{
DisableHostHeaderFallback: disableHostHeaderFallback(class),
}
for _, override := range metricsCfg.MetricsForClass(class) {
metricName, f := metricToPrometheusMetric[override.Name]
if !f {
// Not a predefined metric, must be a custom one
metricName = override.Name
}
mc := &stats.MetricConfig{
Dimensions: map[string]string{},
Name: metricName,
Drop: override.Disabled,
}
for _, t := range override.Tags {
if t.Remove {
mc.TagsToRemove = append(mc.TagsToRemove, t.Name)
} else {
mc.Dimensions[t.Name] = t.Value
}
}
cfg.Metrics = append(cfg.Metrics, mc)
}
// In WASM we are not actually processing protobuf at all, so we need to encode this to JSON
cfgJSON, _ := protomarshal.MarshalProtoNames(&cfg)
return networking.MessageToAny(&wrappers.StringValue{Value: string(cfgJSON)})
}
func disableHostHeaderFallback(class networking.ListenerClass) bool {
return class == networking.ListenerClassSidecarInbound || class == networking.ListenerClassGateway
}