blob: 5213791cd3295385add9223292a82460ff26952a [file] [log] [blame]
// Copyright Istio Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v1alpha3
import (
"crypto/md5"
"encoding/hex"
"fmt"
"math"
"sort"
"strconv"
"strings"
)
import (
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
auth "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
http "github.com/envoyproxy/go-control-plane/envoy/extensions/upstreams/http/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
any "google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/structpb"
wrappers "google.golang.org/protobuf/types/known/wrapperspb"
meshconfig "istio.io/api/mesh/v1alpha1"
networking "istio.io/api/networking/v1alpha3"
"istio.io/pkg/log"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/telemetry"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/util"
authn_model "github.com/apache/dubbo-go-pixiu/pilot/pkg/security/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/provider"
xdsfilters "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/filters"
v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3"
istio_cluster "github.com/apache/dubbo-go-pixiu/pkg/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/config/labels"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk"
"github.com/apache/dubbo-go-pixiu/pkg/security"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
)
var istioMtlsTransportSocketMatch = &structpb.Struct{
Fields: map[string]*structpb.Value{
model.TLSModeLabelShortname: {Kind: &structpb.Value_StringValue{StringValue: model.IstioMutualTLSModeLabel}},
},
}
// h2UpgradeMap specifies the truth table when upgrade takes place.
var h2UpgradeMap = map[upgradeTuple]bool{
{meshconfig.MeshConfig_DO_NOT_UPGRADE, networking.ConnectionPoolSettings_HTTPSettings_UPGRADE}: true,
{meshconfig.MeshConfig_DO_NOT_UPGRADE, networking.ConnectionPoolSettings_HTTPSettings_DO_NOT_UPGRADE}: false,
{meshconfig.MeshConfig_DO_NOT_UPGRADE, networking.ConnectionPoolSettings_HTTPSettings_DEFAULT}: false,
{meshconfig.MeshConfig_UPGRADE, networking.ConnectionPoolSettings_HTTPSettings_UPGRADE}: true,
{meshconfig.MeshConfig_UPGRADE, networking.ConnectionPoolSettings_HTTPSettings_DO_NOT_UPGRADE}: false,
{meshconfig.MeshConfig_UPGRADE, networking.ConnectionPoolSettings_HTTPSettings_DEFAULT}: true,
}
// passthroughHttpProtocolOptions are http protocol options used for pass through clusters.
// nolint
var passthroughHttpProtocolOptions = util.MessageToAny(&http.HttpProtocolOptions{
UpstreamProtocolOptions: &http.HttpProtocolOptions_UseDownstreamProtocolConfig{
UseDownstreamProtocolConfig: &http.HttpProtocolOptions_UseDownstreamHttpConfig{
HttpProtocolOptions: &core.Http1ProtocolOptions{},
Http2ProtocolOptions: http2ProtocolOptions(),
},
},
})
// MutableCluster wraps Cluster object along with options.
type MutableCluster struct {
cluster *cluster.Cluster
// httpProtocolOptions stores the HttpProtocolOptions which will be marshaled when build is called.
httpProtocolOptions *http.HttpProtocolOptions
}
// metadataCerts hosts client certificate related metadata specified in proxy metadata.
type metadataCerts struct {
// tlsClientCertChain is the absolute path to client cert-chain file
tlsClientCertChain string
// tlsClientKey is the absolute path to client private key file
tlsClientKey string
// tlsClientRootCert is the absolute path to client root cert file
tlsClientRootCert string
}
// ClusterBuilder interface provides an abstraction for building Envoy Clusters.
type ClusterBuilder struct {
// Proxy related information used to build clusters.
serviceInstances []*model.ServiceInstance // Service instances of Proxy.
metadataCerts *metadataCerts // Client certificates specified in metadata.
clusterID string // Cluster in which proxy is running.
proxyID string // Identifier that uniquely identifies a proxy.
proxyVersion string // Version of Proxy.
proxyType model.NodeType // Indicates whether the proxy is sidecar or gateway.
sidecarScope *model.SidecarScope // Computed sidecar for the proxy.
passThroughBindIP string // Passthrough IP to be used while building clusters.
supportsIPv4 bool // Whether Proxy IPs has IPv4 address.
supportsIPv6 bool // Whether Proxy IPs has IPv6 address.
locality *core.Locality // Locality information of proxy.
proxyLabels map[string]string // Proxy labels.
proxyView model.ProxyView // Proxy view of endpoints.
proxyIPAddresses []string // IP addresses on which proxy is listening on.
configNamespace string // Proxy config namespace.
// PushRequest to look for updates.
req *model.PushRequest
cache model.XdsCache
}
// NewClusterBuilder builds an instance of ClusterBuilder.
func NewClusterBuilder(proxy *model.Proxy, req *model.PushRequest, cache model.XdsCache) *ClusterBuilder {
cb := &ClusterBuilder{
serviceInstances: proxy.ServiceInstances,
proxyID: proxy.ID,
proxyType: proxy.Type,
proxyVersion: proxy.Metadata.IstioVersion,
sidecarScope: proxy.SidecarScope,
passThroughBindIP: getPassthroughBindIP(proxy),
supportsIPv4: proxy.SupportsIPv4(),
supportsIPv6: proxy.SupportsIPv6(),
locality: proxy.Locality,
proxyLabels: proxy.Metadata.Labels,
proxyView: proxy.GetView(),
proxyIPAddresses: proxy.IPAddresses,
configNamespace: proxy.ConfigNamespace,
req: req,
cache: cache,
}
if proxy.Metadata != nil {
if proxy.Metadata.TLSClientCertChain != "" {
cb.metadataCerts = &metadataCerts{
tlsClientCertChain: proxy.Metadata.TLSClientCertChain,
tlsClientKey: proxy.Metadata.TLSClientKey,
tlsClientRootCert: proxy.Metadata.TLSClientRootCert,
}
}
cb.clusterID = string(proxy.Metadata.ClusterID)
}
return cb
}
func (m *metadataCerts) String() string {
return m.tlsClientCertChain + "~" + m.tlsClientKey + "~" + m.tlsClientRootCert
}
// NewMutableCluster initializes MutableCluster with the cluster passed.
func NewMutableCluster(cluster *cluster.Cluster) *MutableCluster {
return &MutableCluster{
cluster: cluster,
}
}
// sidecarProxy returns true if the clusters are being built for sidecar proxy otherwise false.
func (cb *ClusterBuilder) sidecarProxy() bool {
return cb.proxyType == model.SidecarProxy
}
func (cb *ClusterBuilder) buildSubsetCluster(opts buildClusterOpts, destRule *config.Config, subset *networking.Subset, service *model.Service,
proxyView model.ProxyView) *cluster.Cluster {
opts.serviceMTLSMode = cb.req.Push.BestEffortInferServiceMTLSMode(subset.GetTrafficPolicy(), service, opts.port)
var subsetClusterName string
var defaultSni string
if opts.clusterMode == DefaultClusterMode {
subsetClusterName = model.BuildSubsetKey(model.TrafficDirectionOutbound, subset.Name, service.Hostname, opts.port.Port)
defaultSni = model.BuildDNSSrvSubsetKey(model.TrafficDirectionOutbound, subset.Name, service.Hostname, opts.port.Port)
} else {
subsetClusterName = model.BuildDNSSrvSubsetKey(model.TrafficDirectionOutbound, subset.Name, service.Hostname, opts.port.Port)
}
// clusters with discovery type STATIC, STRICT_DNS rely on cluster.LoadAssignment field.
// ServiceEntry's need to filter hosts based on subset.labels in order to perform weighted routing
var lbEndpoints []*endpoint.LocalityLbEndpoints
isPassthrough := subset.GetTrafficPolicy().GetLoadBalancer().GetSimple() == networking.LoadBalancerSettings_PASSTHROUGH
clusterType := opts.mutable.cluster.GetType()
if isPassthrough {
clusterType = cluster.Cluster_ORIGINAL_DST
}
if !(isPassthrough || clusterType == cluster.Cluster_EDS) {
if len(subset.Labels) != 0 {
lbEndpoints = cb.buildLocalityLbEndpoints(proxyView, service, opts.port.Port, subset.Labels)
} else {
lbEndpoints = cb.buildLocalityLbEndpoints(proxyView, service, opts.port.Port, nil)
}
if len(lbEndpoints) == 0 {
log.Debugf("locality endpoints missing for cluster %s", subsetClusterName)
}
}
subsetCluster := cb.buildDefaultCluster(subsetClusterName, clusterType, lbEndpoints, model.TrafficDirectionOutbound, opts.port, service, nil)
if subsetCluster == nil {
return nil
}
if len(cb.req.Push.Mesh.OutboundClusterStatName) != 0 {
subsetCluster.cluster.AltStatName = telemetry.BuildStatPrefix(cb.req.Push.Mesh.OutboundClusterStatName,
string(service.Hostname), subset.Name, opts.port, &service.Attributes)
}
// Apply traffic policy for subset cluster with the destination rule traffic policy.
opts.mutable = subsetCluster
opts.istioMtlsSni = defaultSni
// If subset has a traffic policy, apply it so that it overrides the destination rule traffic policy.
opts.policy = MergeTrafficPolicy(opts.policy, subset.TrafficPolicy, opts.port)
if destRule != nil {
destinationRule := CastDestinationRule(destRule)
opts.isDrWithSelector = destinationRule.GetWorkloadSelector() != nil
}
// Apply traffic policy for the subset cluster.
cb.applyTrafficPolicy(opts)
maybeApplyEdsConfig(subsetCluster.cluster)
if cb.proxyType == model.Router || opts.direction == model.TrafficDirectionOutbound {
cb.applyMetadataExchange(opts.mutable.cluster)
}
// Add the DestinationRule+subsets metadata. Metadata here is generated on a per-cluster
// basis in buildDefaultCluster, so we can just insert without a copy.
subsetCluster.cluster.Metadata = util.AddConfigInfoMetadata(subsetCluster.cluster.Metadata, destRule.Meta)
util.AddSubsetToMetadata(subsetCluster.cluster.Metadata, subset.Name)
return subsetCluster.build()
}
// applyDestinationRule applies the destination rule if it exists for the Service. It returns the subset clusters if any created as it
// applies the destination rule.
func (cb *ClusterBuilder) applyDestinationRule(mc *MutableCluster, clusterMode ClusterMode, service *model.Service,
port *model.Port, proxyView model.ProxyView, destRule *config.Config, serviceAccounts []string) []*cluster.Cluster {
destinationRule := CastDestinationRule(destRule)
// merge applicable port level traffic policy settings
trafficPolicy := MergeTrafficPolicy(nil, destinationRule.GetTrafficPolicy(), port)
opts := buildClusterOpts{
mesh: cb.req.Push.Mesh,
serviceInstances: cb.serviceInstances,
mutable: mc,
policy: trafficPolicy,
port: port,
clusterMode: clusterMode,
direction: model.TrafficDirectionOutbound,
}
if clusterMode == DefaultClusterMode {
opts.serviceAccounts = serviceAccounts
opts.istioMtlsSni = model.BuildDNSSrvSubsetKey(model.TrafficDirectionOutbound, "", service.Hostname, port.Port)
opts.meshExternal = service.MeshExternal
opts.serviceRegistry = service.Attributes.ServiceRegistry
opts.serviceMTLSMode = cb.req.Push.BestEffortInferServiceMTLSMode(destinationRule.GetTrafficPolicy(), service, port)
}
if destRule != nil {
opts.isDrWithSelector = destinationRule.GetWorkloadSelector() != nil
}
// Apply traffic policy for the main default cluster.
cb.applyTrafficPolicy(opts)
// Apply EdsConfig if needed. This should be called after traffic policy is applied because, traffic policy might change
// discovery type.
maybeApplyEdsConfig(mc.cluster)
if cb.proxyType == model.Router || opts.direction == model.TrafficDirectionOutbound {
cb.applyMetadataExchange(opts.mutable.cluster)
}
if destRule != nil {
mc.cluster.Metadata = util.AddConfigInfoMetadata(mc.cluster.Metadata, destRule.Meta)
}
subsetClusters := make([]*cluster.Cluster, 0)
for _, subset := range destinationRule.GetSubsets() {
subsetCluster := cb.buildSubsetCluster(opts, destRule, subset, service, proxyView)
if subsetCluster != nil {
subsetClusters = append(subsetClusters, subsetCluster)
}
}
return subsetClusters
}
func (cb *ClusterBuilder) applyMetadataExchange(c *cluster.Cluster) {
if features.MetadataExchange {
c.Filters = append(c.Filters, xdsfilters.TCPClusterMx)
}
}
// MergeTrafficPolicy returns the merged TrafficPolicy for a destination-level and subset-level policy on a given port.
func MergeTrafficPolicy(original, subsetPolicy *networking.TrafficPolicy, port *model.Port) *networking.TrafficPolicy {
if subsetPolicy == nil {
return original
}
// Sanity check that top-level port level settings have already been merged for the given port
if original != nil && len(original.PortLevelSettings) != 0 {
original = MergeTrafficPolicy(nil, original, port)
}
mergedPolicy := &networking.TrafficPolicy{}
if original != nil {
mergedPolicy.ConnectionPool = original.ConnectionPool
mergedPolicy.LoadBalancer = original.LoadBalancer
mergedPolicy.OutlierDetection = original.OutlierDetection
mergedPolicy.Tls = original.Tls
}
// Override with subset values.
if subsetPolicy.ConnectionPool != nil {
mergedPolicy.ConnectionPool = subsetPolicy.ConnectionPool
}
if subsetPolicy.OutlierDetection != nil {
mergedPolicy.OutlierDetection = subsetPolicy.OutlierDetection
}
if subsetPolicy.LoadBalancer != nil {
mergedPolicy.LoadBalancer = subsetPolicy.LoadBalancer
}
if subsetPolicy.Tls != nil {
mergedPolicy.Tls = subsetPolicy.Tls
}
// Check if port level overrides exist, if yes override with them.
if port != nil {
for _, p := range subsetPolicy.PortLevelSettings {
if p.Port != nil && uint32(port.Port) == p.Port.Number {
// per the docs, port level policies do not inherit and instead to defaults if not provided
mergedPolicy.ConnectionPool = p.ConnectionPool
mergedPolicy.OutlierDetection = p.OutlierDetection
mergedPolicy.LoadBalancer = p.LoadBalancer
mergedPolicy.Tls = p.Tls
break
}
}
}
return mergedPolicy
}
// buildDefaultCluster builds the default cluster and also applies default traffic policy.
func (cb *ClusterBuilder) buildDefaultCluster(name string, discoveryType cluster.Cluster_DiscoveryType,
localityLbEndpoints []*endpoint.LocalityLbEndpoints, direction model.TrafficDirection,
port *model.Port, service *model.Service, allInstances []*model.ServiceInstance) *MutableCluster {
if allInstances == nil {
allInstances = cb.serviceInstances
}
c := &cluster.Cluster{
Name: name,
ClusterDiscoveryType: &cluster.Cluster_Type{Type: discoveryType},
}
ec := NewMutableCluster(c)
switch discoveryType {
case cluster.Cluster_STRICT_DNS, cluster.Cluster_LOGICAL_DNS:
if cb.supportsIPv4 {
c.DnsLookupFamily = cluster.Cluster_V4_ONLY
} else {
c.DnsLookupFamily = cluster.Cluster_V6_ONLY
}
dnsRate := cb.req.Push.Mesh.DnsRefreshRate
c.DnsRefreshRate = dnsRate
c.RespectDnsTtl = true
fallthrough
case cluster.Cluster_STATIC:
if len(localityLbEndpoints) == 0 {
cb.req.Push.AddMetric(model.DNSNoEndpointClusters, c.Name, cb.proxyID,
fmt.Sprintf("%s cluster without endpoints %s found while pushing CDS", discoveryType.String(), c.Name))
return nil
}
c.LoadAssignment = &endpoint.ClusterLoadAssignment{
ClusterName: name,
Endpoints: localityLbEndpoints,
}
}
// For inbound clusters, the default traffic policy is used. For outbound clusters, the default traffic policy
// will be applied, which would be overridden by traffic policy specified in destination rule, if any.
opts := buildClusterOpts{
mesh: cb.req.Push.Mesh,
mutable: ec,
policy: nil,
port: port,
serviceAccounts: nil,
istioMtlsSni: "",
clusterMode: DefaultClusterMode,
direction: direction,
serviceInstances: cb.serviceInstances,
}
// decides whether the cluster corresponds to a service external to mesh or not.
if direction == model.TrafficDirectionInbound {
// Inbound cluster always corresponds to service in the mesh.
opts.meshExternal = false
} else if service != nil {
// otherwise, read this information from service object.
opts.meshExternal = service.MeshExternal
}
cb.setUpstreamProtocol(ec, port, direction)
addTelemetryMetadata(opts, service, direction, allInstances)
addNetworkingMetadata(opts, service, direction)
return ec
}
type clusterCache struct {
clusterName string
// proxy related cache fields
proxyVersion string // will be matched by envoyfilter patches
locality *core.Locality // identifies the locality the cluster is generated for
proxyClusterID string // identifies the kubernetes cluster a proxy is in
proxySidecar bool // identifies if this proxy is a Sidecar
proxyView model.ProxyView
metadataCerts *metadataCerts // metadata certificates of proxy
// service attributes
http2 bool // http2 identifies if the cluster is for an http2 service
downstreamAuto bool
supportsIPv4 bool
// Dependent configs
service *model.Service
destinationRule *config.Config
envoyFilterKeys []string
peerAuthVersion string // identifies the versions of all peer authentications
serviceAccounts []string // contains all the service accounts associated with the service
}
func (t *clusterCache) Key() string {
params := []string{
t.clusterName, t.proxyVersion, util.LocalityToString(t.locality),
t.proxyClusterID, strconv.FormatBool(t.proxySidecar),
strconv.FormatBool(t.http2), strconv.FormatBool(t.downstreamAuto), strconv.FormatBool(t.supportsIPv4),
}
if t.proxyView != nil {
params = append(params, t.proxyView.String())
}
if t.metadataCerts != nil {
params = append(params, t.metadataCerts.String())
}
if t.service != nil {
params = append(params, string(t.service.Hostname)+"/"+t.service.Attributes.Namespace)
}
if t.destinationRule != nil {
params = append(params, t.destinationRule.Name+"/"+t.destinationRule.Namespace)
}
params = append(params, t.envoyFilterKeys...)
params = append(params, t.peerAuthVersion)
params = append(params, t.serviceAccounts...)
hash := md5.New()
for _, param := range params {
hash.Write([]byte(param))
}
sum := hash.Sum(nil)
return hex.EncodeToString(sum)
}
func (t clusterCache) DependentConfigs() []model.ConfigKey {
configs := []model.ConfigKey{}
if t.destinationRule != nil {
configs = append(configs, model.ConfigKey{Kind: gvk.DestinationRule, Name: t.destinationRule.Name, Namespace: t.destinationRule.Namespace})
}
if t.service != nil {
configs = append(configs, model.ConfigKey{Kind: gvk.ServiceEntry, Name: string(t.service.Hostname), Namespace: t.service.Attributes.Namespace})
}
for _, efKey := range t.envoyFilterKeys {
items := strings.Split(efKey, "/")
configs = append(configs, model.ConfigKey{Kind: gvk.EnvoyFilter, Name: items[1], Namespace: items[0]})
}
return configs
}
func (t *clusterCache) DependentTypes() []config.GroupVersionKind {
return nil
}
func (t clusterCache) Cacheable() bool {
return true
}
// buildInboundClusterForPortOrUDS constructs a single inbound listener. The cluster will be bound to
// `inbound|clusterPort||`, and send traffic to <bind>:<instance.Endpoint.EndpointPort>. A workload
// will have a single inbound cluster per port. In general this works properly, with the exception of
// the Service-oriented DestinationRule, and upstream protocol selection. Our documentation currently
// requires a single protocol per port, and the DestinationRule issue is slated to move to Sidecar.
// Note: clusterPort and instance.Endpoint.EndpointPort are identical for standard Services; however,
// Sidecar.Ingress allows these to be different.
func (cb *ClusterBuilder) buildInboundClusterForPortOrUDS(clusterPort int, bind string,
proxy *model.Proxy, instance *model.ServiceInstance, allInstance []*model.ServiceInstance) *MutableCluster {
clusterName := model.BuildInboundSubsetKey(clusterPort)
localityLbEndpoints := buildInboundLocalityLbEndpoints(bind, instance.Endpoint.EndpointPort)
clusterType := cluster.Cluster_ORIGINAL_DST
if len(localityLbEndpoints) > 0 {
clusterType = cluster.Cluster_STATIC
}
localCluster := cb.buildDefaultCluster(clusterName, clusterType, localityLbEndpoints,
model.TrafficDirectionInbound, instance.ServicePort, instance.Service, allInstance)
if clusterType == cluster.Cluster_ORIGINAL_DST {
// Extend cleanupInterval beyond 5s default. This ensures that upstream connections will stay
// open for up to 60s. With the default of 5s, we may tear things down too quickly for
// infrequently accessed services.
localCluster.cluster.CleanupInterval = &durationpb.Duration{Seconds: 60}
}
// If stat name is configured, build the alt statname.
if len(cb.req.Push.Mesh.InboundClusterStatName) != 0 {
localCluster.cluster.AltStatName = telemetry.BuildStatPrefix(cb.req.Push.Mesh.InboundClusterStatName,
string(instance.Service.Hostname), "", instance.ServicePort, &instance.Service.Attributes)
}
opts := buildClusterOpts{
mesh: cb.req.Push.Mesh,
mutable: localCluster,
policy: nil,
port: instance.ServicePort,
serviceAccounts: nil,
serviceInstances: cb.serviceInstances,
istioMtlsSni: "",
clusterMode: DefaultClusterMode,
direction: model.TrafficDirectionInbound,
}
// When users specify circuit breakers, they need to be set on the receiver end
// (server side) as well as client side, so that the server has enough capacity
// (not the defaults) to handle the increased traffic volume
// TODO: This is not foolproof - if instance is part of multiple services listening on same port,
// choice of inbound cluster is arbitrary. So the connection pool settings may not apply cleanly.
cfg := proxy.SidecarScope.DestinationRule(model.TrafficDirectionInbound, proxy, instance.Service.Hostname)
if cfg != nil {
destinationRule := cfg.Spec.(*networking.DestinationRule)
opts.isDrWithSelector = destinationRule.GetWorkloadSelector() != nil
if destinationRule.TrafficPolicy != nil {
opts.policy = MergeTrafficPolicy(opts.policy, destinationRule.TrafficPolicy, instance.ServicePort)
util.AddConfigInfoMetadata(localCluster.cluster.Metadata, cfg.Meta)
}
}
cb.applyTrafficPolicy(opts)
if bind != LocalhostAddress && bind != LocalhostIPv6Address {
// iptables will redirect our own traffic to localhost back to us if we do not use the "magic" upstream bind
// config which will be skipped.
localCluster.cluster.UpstreamBindConfig = &core.BindConfig{
SourceAddress: &core.SocketAddress{
Address: cb.passThroughBindIP,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: uint32(0),
},
},
}
}
return localCluster
}
func (cb *ClusterBuilder) buildLocalityLbEndpoints(proxyView model.ProxyView, service *model.Service,
port int, labels labels.Instance) []*endpoint.LocalityLbEndpoints {
if !(service.Resolution == model.DNSLB || service.Resolution == model.DNSRoundRobinLB) {
return nil
}
instances := cb.req.Push.ServiceInstancesByPort(service, port, labels)
// Determine whether or not the target service is considered local to the cluster
// and should, therefore, not be accessed from outside the cluster.
isClusterLocal := cb.req.Push.IsClusterLocal(service)
lbEndpoints := make(map[string][]*endpoint.LbEndpoint)
for _, instance := range instances {
// Only send endpoints from the networks in the network view requested by the proxy.
// The default network view assigned to the Proxy is nil, in that case match any network.
if !proxyView.IsVisible(instance.Endpoint) {
// Endpoint's network doesn't match the set of networks that the proxy wants to see.
continue
}
// If the downstream service is configured as cluster-local, only include endpoints that
// reside in the same cluster.
if isClusterLocal && (cb.clusterID != string(instance.Endpoint.Locality.ClusterID)) {
continue
}
// TODO(nmittler): Consider merging discoverability policy with cluster-local
// TODO(ramaraochavali): Find a better way here so that we do not have build proxy.
// Currently it works because we only determine discoverability only by cluster.
if !instance.Endpoint.IsDiscoverableFromProxy(&model.Proxy{Metadata: &model.NodeMetadata{ClusterID: istio_cluster.ID(cb.clusterID)}}) {
continue
}
addr := util.BuildAddress(instance.Endpoint.Address, instance.Endpoint.EndpointPort)
ep := &endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: addr,
},
},
LoadBalancingWeight: &wrappers.UInt32Value{
Value: instance.Endpoint.GetLoadBalancingWeight(),
},
}
labels := instance.Endpoint.Labels
ns := instance.Endpoint.Namespace
if features.CanonicalServiceForMeshExternalServiceEntry && service.MeshExternal {
ns = service.Attributes.Namespace
svcLabels := service.Attributes.Labels
if _, ok := svcLabels[model.IstioCanonicalServiceLabelName]; ok {
labels = map[string]string{
model.IstioCanonicalServiceLabelName: svcLabels[model.IstioCanonicalServiceLabelName],
model.IstioCanonicalServiceRevisionLabelName: svcLabels[model.IstioCanonicalServiceRevisionLabelName],
}
for k, v := range instance.Endpoint.Labels {
labels[k] = v
}
}
}
ep.Metadata = util.BuildLbEndpointMetadata(instance.Endpoint.Network, instance.Endpoint.TLSMode, instance.Endpoint.WorkloadName,
ns, instance.Endpoint.Locality.ClusterID, labels)
locality := instance.Endpoint.Locality.Label
lbEndpoints[locality] = append(lbEndpoints[locality], ep)
}
localityLbEndpoints := make([]*endpoint.LocalityLbEndpoints, 0, len(lbEndpoints))
locs := make([]string, 0, len(lbEndpoints))
for k := range lbEndpoints {
locs = append(locs, k)
}
if len(locs) >= 2 {
sort.Strings(locs)
}
for _, locality := range locs {
eps := lbEndpoints[locality]
var weight uint32
var overflowStatus bool
for _, ep := range eps {
weight, overflowStatus = addUint32(weight, ep.LoadBalancingWeight.GetValue())
}
if overflowStatus {
log.Warnf("Sum of localityLbEndpoints weight is overflow: service:%s, port: %d, locality:%s",
service.Hostname, port, locality)
}
localityLbEndpoints = append(localityLbEndpoints, &endpoint.LocalityLbEndpoints{
Locality: util.ConvertLocality(locality),
LbEndpoints: eps,
LoadBalancingWeight: &wrappers.UInt32Value{
Value: weight,
},
})
}
return localityLbEndpoints
}
// addUint32AvoidOverflow returns sum of two uint32 and status. If sum overflows,
// and returns MaxUint32 and status.
func addUint32(left, right uint32) (uint32, bool) {
if math.MaxUint32-right < left {
return math.MaxUint32, true
}
return left + right, false
}
// buildInboundPassthroughClusters builds passthrough clusters for inbound.
func (cb *ClusterBuilder) buildInboundPassthroughClusters() []*cluster.Cluster {
// ipv4 and ipv6 feature detection. Envoy cannot ignore a config where the ip version is not supported
clusters := make([]*cluster.Cluster, 0, 2)
if cb.supportsIPv4 {
inboundPassthroughClusterIpv4 := cb.buildDefaultPassthroughCluster()
inboundPassthroughClusterIpv4.Name = util.InboundPassthroughClusterIpv4
inboundPassthroughClusterIpv4.Filters = nil
inboundPassthroughClusterIpv4.UpstreamBindConfig = &core.BindConfig{
SourceAddress: &core.SocketAddress{
Address: InboundPassthroughBindIpv4,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: uint32(0),
},
},
}
clusters = append(clusters, inboundPassthroughClusterIpv4)
}
if cb.supportsIPv6 {
inboundPassthroughClusterIpv6 := cb.buildDefaultPassthroughCluster()
inboundPassthroughClusterIpv6.Name = util.InboundPassthroughClusterIpv6
inboundPassthroughClusterIpv6.Filters = nil
inboundPassthroughClusterIpv6.UpstreamBindConfig = &core.BindConfig{
SourceAddress: &core.SocketAddress{
Address: InboundPassthroughBindIpv6,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: uint32(0),
},
},
}
clusters = append(clusters, inboundPassthroughClusterIpv6)
}
return clusters
}
// generates a cluster that sends traffic to dummy localport 0
// This cluster is used to catch all traffic to unresolved destinations in virtual service
func (cb *ClusterBuilder) buildBlackHoleCluster() *cluster.Cluster {
c := &cluster.Cluster{
Name: util.BlackHoleCluster,
ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_STATIC},
ConnectTimeout: cb.req.Push.Mesh.ConnectTimeout,
LbPolicy: cluster.Cluster_ROUND_ROBIN,
}
return c
}
// generates a cluster that sends traffic to the original destination.
// This cluster is used to catch all traffic to unknown listener ports
func (cb *ClusterBuilder) buildDefaultPassthroughCluster() *cluster.Cluster {
cluster := &cluster.Cluster{
Name: util.PassthroughCluster,
ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_ORIGINAL_DST},
ConnectTimeout: cb.req.Push.Mesh.ConnectTimeout,
LbPolicy: cluster.Cluster_CLUSTER_PROVIDED,
TypedExtensionProtocolOptions: map[string]*any.Any{
v3.HttpProtocolOptionsType: passthroughHttpProtocolOptions,
},
}
cb.applyConnectionPool(cb.req.Push.Mesh, NewMutableCluster(cluster), &networking.ConnectionPoolSettings{})
cb.applyMetadataExchange(cluster)
return cluster
}
// applyH2Upgrade function will upgrade outbound cluster to http2 if specified by configuration.
func (cb *ClusterBuilder) applyH2Upgrade(opts buildClusterOpts, connectionPool *networking.ConnectionPoolSettings) {
if cb.shouldH2Upgrade(opts.mutable.cluster.Name, opts.direction, opts.port, opts.mesh, connectionPool) {
cb.setH2Options(opts.mutable)
}
}
// shouldH2Upgrade function returns true if the cluster should be upgraded to http2.
func (cb *ClusterBuilder) shouldH2Upgrade(clusterName string, direction model.TrafficDirection, port *model.Port, mesh *meshconfig.MeshConfig,
connectionPool *networking.ConnectionPoolSettings) bool {
if direction != model.TrafficDirectionOutbound {
return false
}
// TODO (mjog)
// Upgrade if tls.GetMode() == networking.TLSSettings_ISTIO_MUTUAL
override := networking.ConnectionPoolSettings_HTTPSettings_DEFAULT
if connectionPool != nil && connectionPool.Http != nil {
override = connectionPool.Http.H2UpgradePolicy
}
// If user wants an upgrade at destination rule/port level that means he is sure that
// it is a Http port - upgrade in such case. This is useful incase protocol sniffing is
// enabled and user wants to upgrade/preserve http protocol from client.
if override == networking.ConnectionPoolSettings_HTTPSettings_UPGRADE {
log.Debugf("Upgrading cluster: %v (%v %v)", clusterName, mesh.H2UpgradePolicy, override)
return true
}
// Do not upgrade non-http ports. This also ensures that we are only upgrading
// named ports so that protocol sniffing does not interfere. Protocol sniffing
// uses downstream protocol. Therefore if the client upgrades connection to http2,
// the server will send h2 stream to the application,even though the application only
// supports http 1.1.
if port != nil && !port.Protocol.IsHTTP() {
return false
}
if !h2UpgradeMap[upgradeTuple{mesh.H2UpgradePolicy, override}] {
log.Debugf("Not upgrading cluster: %v (%v %v)", clusterName, mesh.H2UpgradePolicy, override)
return false
}
log.Debugf("Upgrading cluster: %v (%v %v)", clusterName, mesh.H2UpgradePolicy, override)
return true
}
// setH2Options make the cluster an h2 cluster by setting http2ProtocolOptions.
func (cb *ClusterBuilder) setH2Options(mc *MutableCluster) {
if mc == nil {
return
}
if mc.httpProtocolOptions == nil {
mc.httpProtocolOptions = &http.HttpProtocolOptions{}
}
options := mc.httpProtocolOptions
if options.UpstreamHttpProtocolOptions == nil {
options.UpstreamProtocolOptions = &http.HttpProtocolOptions_ExplicitHttpConfig_{
ExplicitHttpConfig: &http.HttpProtocolOptions_ExplicitHttpConfig{
ProtocolConfig: &http.HttpProtocolOptions_ExplicitHttpConfig_Http2ProtocolOptions{
Http2ProtocolOptions: http2ProtocolOptions(),
},
},
}
}
}
func (cb *ClusterBuilder) applyTrafficPolicy(opts buildClusterOpts) {
connectionPool, outlierDetection, loadBalancer, tls := selectTrafficPolicyComponents(opts.policy)
// Connection pool settings are applicable for both inbound and outbound clusters.
if connectionPool == nil {
connectionPool = &networking.ConnectionPoolSettings{}
}
cb.applyConnectionPool(opts.mesh, opts.mutable, connectionPool)
if opts.direction != model.TrafficDirectionInbound {
cb.applyH2Upgrade(opts, connectionPool)
applyOutlierDetection(opts.mutable.cluster, outlierDetection)
applyLoadBalancer(opts.mutable.cluster, loadBalancer, opts.port, cb.locality, cb.proxyLabels, opts.mesh)
if opts.clusterMode != SniDnatClusterMode {
autoMTLSEnabled := opts.mesh.GetEnableAutoMtls().Value
tls, mtlsCtxType := cb.buildAutoMtlsSettings(tls, opts.serviceAccounts, opts.istioMtlsSni,
autoMTLSEnabled, opts.meshExternal, opts.serviceMTLSMode)
cb.applyUpstreamTLSSettings(&opts, tls, mtlsCtxType)
}
}
if opts.mutable.cluster.GetType() == cluster.Cluster_ORIGINAL_DST {
opts.mutable.cluster.LbPolicy = cluster.Cluster_CLUSTER_PROVIDED
}
}
// buildAutoMtlsSettings fills key cert fields for all TLSSettings when the mode is `ISTIO_MUTUAL`.
// If the (input) TLS setting is nil (i.e not set), *and* the service mTLS mode is STRICT, it also
// creates and populates the config as if they are set as ISTIO_MUTUAL.
func (cb *ClusterBuilder) buildAutoMtlsSettings(
tls *networking.ClientTLSSettings,
serviceAccounts []string,
sni string,
autoMTLSEnabled bool,
meshExternal bool,
serviceMTLSMode model.MutualTLSMode) (*networking.ClientTLSSettings, mtlsContextType) {
if tls != nil {
if tls.Mode == networking.ClientTLSSettings_DISABLE || tls.Mode == networking.ClientTLSSettings_SIMPLE {
return tls, userSupplied
}
// For backward compatibility, use metadata certs if provided.
if cb.hasMetadataCerts() {
// When building Mutual TLS settings, we should always use user supplied SubjectAltNames and SNI
// in destination rule. The Service Accounts and auto computed SNI should only be used for
// ISTIO_MUTUAL.
return cb.buildMutualTLS(tls.SubjectAltNames, tls.Sni), userSupplied
}
if tls.Mode != networking.ClientTLSSettings_ISTIO_MUTUAL {
return tls, userSupplied
}
// Update TLS settings for ISTIO_MUTUAL. Use client provided SNI if set. Otherwise,
// overwrite with the auto generated SNI. User specified SNIs in the istio mtls settings
// are useful when routing via gateways. Use Service Accounts if Subject Alt names
// are not specified in TLS settings.
sniToUse := tls.Sni
if len(sniToUse) == 0 {
sniToUse = sni
}
subjectAltNamesToUse := tls.SubjectAltNames
if subjectAltNamesToUse == nil {
subjectAltNamesToUse = serviceAccounts
}
return cb.buildIstioMutualTLS(subjectAltNamesToUse, sniToUse), userSupplied
}
if meshExternal || !autoMTLSEnabled || serviceMTLSMode == model.MTLSUnknown || serviceMTLSMode == model.MTLSDisable {
return nil, userSupplied
}
// For backward compatibility, use metadata certs if provided.
if cb.hasMetadataCerts() {
return cb.buildMutualTLS(serviceAccounts, sni), autoDetected
}
// Build settings for auto MTLS.
return cb.buildIstioMutualTLS(serviceAccounts, sni), autoDetected
}
func (cb *ClusterBuilder) hasMetadataCerts() bool {
return cb.metadataCerts != nil
}
type mtlsContextType int
const (
userSupplied mtlsContextType = iota
autoDetected
)
// buildMutualTLS returns a `TLSSettings` for MUTUAL mode with proxy metadata certificates.
func (cb *ClusterBuilder) buildMutualTLS(serviceAccounts []string, sni string) *networking.ClientTLSSettings {
return &networking.ClientTLSSettings{
Mode: networking.ClientTLSSettings_MUTUAL,
CaCertificates: cb.metadataCerts.tlsClientRootCert,
ClientCertificate: cb.metadataCerts.tlsClientCertChain,
PrivateKey: cb.metadataCerts.tlsClientKey,
SubjectAltNames: serviceAccounts,
Sni: sni,
}
}
// buildIstioMutualTLS returns a `TLSSettings` for ISTIO_MUTUAL mode.
func (cb *ClusterBuilder) buildIstioMutualTLS(san []string, sni string) *networking.ClientTLSSettings {
return &networking.ClientTLSSettings{
Mode: networking.ClientTLSSettings_ISTIO_MUTUAL,
SubjectAltNames: san,
Sni: sni,
}
}
func (cb *ClusterBuilder) applyDefaultConnectionPool(cluster *cluster.Cluster) {
cluster.ConnectTimeout = cb.req.Push.Mesh.ConnectTimeout
}
// FIXME: there isn't a way to distinguish between unset values and zero values
func (cb *ClusterBuilder) applyConnectionPool(mesh *meshconfig.MeshConfig, mc *MutableCluster, settings *networking.ConnectionPoolSettings) {
if settings == nil {
return
}
threshold := getDefaultCircuitBreakerThresholds()
var idleTimeout *durationpb.Duration
var maxRequestsPerConnection uint32
if settings.Http != nil {
if settings.Http.Http2MaxRequests > 0 {
// Envoy only applies MaxRequests in HTTP/2 clusters
threshold.MaxRequests = &wrappers.UInt32Value{Value: uint32(settings.Http.Http2MaxRequests)}
}
if settings.Http.Http1MaxPendingRequests > 0 {
// Envoy only applies MaxPendingRequests in HTTP/1.1 clusters
threshold.MaxPendingRequests = &wrappers.UInt32Value{Value: uint32(settings.Http.Http1MaxPendingRequests)}
}
// FIXME: zero is a valid value if explicitly set, otherwise we want to use the default
if settings.Http.MaxRetries > 0 {
threshold.MaxRetries = &wrappers.UInt32Value{Value: uint32(settings.Http.MaxRetries)}
}
idleTimeout = settings.Http.IdleTimeout
maxRequestsPerConnection = uint32(settings.Http.MaxRequestsPerConnection)
}
cb.applyDefaultConnectionPool(mc.cluster)
if settings.Tcp != nil {
if settings.Tcp != nil && settings.Tcp.ConnectTimeout != nil {
mc.cluster.ConnectTimeout = settings.Tcp.ConnectTimeout
}
if settings.Tcp != nil && settings.Tcp.MaxConnections > 0 {
threshold.MaxConnections = &wrappers.UInt32Value{Value: uint32(settings.Tcp.MaxConnections)}
}
}
applyTCPKeepalive(mesh, mc.cluster, settings.Tcp)
mc.cluster.CircuitBreakers = &cluster.CircuitBreakers{
Thresholds: []*cluster.CircuitBreakers_Thresholds{threshold},
}
if idleTimeout != nil || maxRequestsPerConnection > 0 {
if mc.httpProtocolOptions == nil {
mc.httpProtocolOptions = &http.HttpProtocolOptions{}
}
commonOptions := mc.httpProtocolOptions
if commonOptions.CommonHttpProtocolOptions == nil {
commonOptions.CommonHttpProtocolOptions = &core.HttpProtocolOptions{}
}
if idleTimeout != nil {
idleTimeoutDuration := idleTimeout
commonOptions.CommonHttpProtocolOptions.IdleTimeout = idleTimeoutDuration
}
if maxRequestsPerConnection > 0 {
commonOptions.CommonHttpProtocolOptions.MaxRequestsPerConnection = &wrappers.UInt32Value{Value: maxRequestsPerConnection}
}
}
if settings.Http != nil && settings.Http.UseClientProtocol {
// Use downstream protocol. If the incoming traffic use HTTP 1.1, the
// upstream cluster will use HTTP 1.1, if incoming traffic use HTTP2,
// the upstream cluster will use HTTP2.
cb.setUseDownstreamProtocol(mc)
}
}
func (cb *ClusterBuilder) applyUpstreamTLSSettings(opts *buildClusterOpts, tls *networking.ClientTLSSettings, mtlsCtxType mtlsContextType) {
if tls == nil {
return
}
c := opts.mutable
tlsContext, err := cb.buildUpstreamClusterTLSContext(opts, tls)
if err != nil {
log.Errorf("failed to build Upstream TLSContext: %s", err.Error())
return
}
if tlsContext != nil {
c.cluster.TransportSocket = &core.TransportSocket{
Name: util.EnvoyTLSSocketName,
ConfigType: &core.TransportSocket_TypedConfig{TypedConfig: util.MessageToAny(tlsContext)},
}
}
// For headless service, discover type will be `Cluster_ORIGINAL_DST`
// Apply auto mtls to clusters excluding these kind of headless service
if c.cluster.GetType() != cluster.Cluster_ORIGINAL_DST {
// convert to transport socket matcher if the mode was auto detected
if tls.Mode == networking.ClientTLSSettings_ISTIO_MUTUAL && mtlsCtxType == autoDetected {
transportSocket := c.cluster.TransportSocket
c.cluster.TransportSocket = nil
c.cluster.TransportSocketMatches = []*cluster.Cluster_TransportSocketMatch{
{
Name: "tlsMode-" + model.IstioMutualTLSModeLabel,
Match: istioMtlsTransportSocketMatch,
TransportSocket: transportSocket,
},
defaultTransportSocketMatch(),
}
}
}
}
func (cb *ClusterBuilder) buildUpstreamClusterTLSContext(opts *buildClusterOpts, tls *networking.ClientTLSSettings) (*auth.UpstreamTlsContext, error) {
// Hack to avoid egress sds cluster config generation for sidecar when
// CredentialName is set in DestinationRule without a workloadSelector.
// We do not want to support CredentialName setting in non workloadSelector based DestinationRules, because
// that would result in the CredentialName being supplied to all the sidecars which the DestinationRule is scoped to,
// resulting in delayed startup of sidecars who do not have access to the credentials.
if tls.CredentialName != "" && cb.sidecarProxy() && !opts.isDrWithSelector {
if tls.Mode == networking.ClientTLSSettings_SIMPLE || tls.Mode == networking.ClientTLSSettings_MUTUAL {
return nil, nil
}
}
c := opts.mutable
var tlsContext *auth.UpstreamTlsContext
switch tls.Mode {
case networking.ClientTLSSettings_DISABLE:
tlsContext = nil
case networking.ClientTLSSettings_ISTIO_MUTUAL:
tlsContext = &auth.UpstreamTlsContext{
CommonTlsContext: defaultUpstreamCommonTLSContext(),
Sni: tls.Sni,
}
tlsContext.CommonTlsContext.TlsCertificateSdsSecretConfigs = append(tlsContext.CommonTlsContext.TlsCertificateSdsSecretConfigs,
authn_model.ConstructSdsSecretConfig(authn_model.SDSDefaultResourceName))
tlsContext.CommonTlsContext.ValidationContextType = &auth.CommonTlsContext_CombinedValidationContext{
CombinedValidationContext: &auth.CommonTlsContext_CombinedCertificateValidationContext{
DefaultValidationContext: &auth.CertificateValidationContext{MatchSubjectAltNames: util.StringToExactMatch(tls.SubjectAltNames)},
ValidationContextSdsSecretConfig: authn_model.ConstructSdsSecretConfig(authn_model.SDSRootResourceName),
},
}
// Set default SNI of cluster name for istio_mutual if sni is not set.
if len(tlsContext.Sni) == 0 {
tlsContext.Sni = c.cluster.Name
}
// `istio-peer-exchange` alpn is only used when using mtls communication between peers.
// We add `istio-peer-exchange` to the list of alpn strings.
// The code has repeated snippets because We want to use predefined alpn strings for efficiency.
if cb.IsHttp2Cluster(c) {
// This is HTTP/2 in-mesh cluster, advertise it with ALPN.
if features.MetadataExchange {
tlsContext.CommonTlsContext.AlpnProtocols = util.ALPNInMeshH2WithMxc
} else {
tlsContext.CommonTlsContext.AlpnProtocols = util.ALPNInMeshH2
}
} else {
// This is in-mesh cluster, advertise it with ALPN.
if features.MetadataExchange {
tlsContext.CommonTlsContext.AlpnProtocols = util.ALPNInMeshWithMxc
} else {
tlsContext.CommonTlsContext.AlpnProtocols = util.ALPNInMesh
}
}
case networking.ClientTLSSettings_SIMPLE:
tlsContext = &auth.UpstreamTlsContext{
CommonTlsContext: defaultUpstreamCommonTLSContext(),
Sni: tls.Sni,
}
cb.setAutoSniAndAutoSanValidation(c, tls)
// Use subject alt names specified in service entry if TLS settings does not have subject alt names.
if opts.serviceRegistry == provider.External && len(tls.SubjectAltNames) == 0 {
tls.SubjectAltNames = opts.serviceAccounts
}
if tls.CredentialName != "" {
// If credential name is specified at Destination Rule config and originating node is egress gateway, create
// SDS config for egress gateway to fetch key/cert at gateway agent.
authn_model.ApplyCustomSDSToClientCommonTLSContext(tlsContext.CommonTlsContext, tls)
} else {
// If CredentialName is not set fallback to files specified in DR.
res := security.SdsCertificateConfig{
CaCertificatePath: tls.CaCertificates,
}
// If tls.CaCertificate or CaCertificate in Metadata isn't configured don't set up SdsSecretConfig
if !res.IsRootCertificate() {
tlsContext.CommonTlsContext.ValidationContextType = &auth.CommonTlsContext_ValidationContext{}
} else {
tlsContext.CommonTlsContext.ValidationContextType = &auth.CommonTlsContext_CombinedValidationContext{
CombinedValidationContext: &auth.CommonTlsContext_CombinedCertificateValidationContext{
DefaultValidationContext: &auth.CertificateValidationContext{MatchSubjectAltNames: util.StringToExactMatch(tls.SubjectAltNames)},
ValidationContextSdsSecretConfig: authn_model.ConstructSdsSecretConfig(res.GetRootResourceName()),
},
}
}
}
if cb.IsHttp2Cluster(c) {
// This is HTTP/2 cluster, advertise it with ALPN.
tlsContext.CommonTlsContext.AlpnProtocols = util.ALPNH2Only
}
case networking.ClientTLSSettings_MUTUAL:
tlsContext = &auth.UpstreamTlsContext{
CommonTlsContext: defaultUpstreamCommonTLSContext(),
Sni: tls.Sni,
}
cb.setAutoSniAndAutoSanValidation(c, tls)
// Use subject alt names specified in service entry if TLS settings does not have subject alt names.
if opts.serviceRegistry == provider.External && len(tls.SubjectAltNames) == 0 {
tls.SubjectAltNames = opts.serviceAccounts
}
if tls.CredentialName != "" {
// If credential name is specified at Destination Rule config and originating node is egress gateway, create
// SDS config for egress gateway to fetch key/cert at gateway agent.
authn_model.ApplyCustomSDSToClientCommonTLSContext(tlsContext.CommonTlsContext, tls)
} else {
// If CredentialName is not set fallback to file based approach
if tls.ClientCertificate == "" || tls.PrivateKey == "" {
err := fmt.Errorf("failed to apply tls setting for %s: client certificate and private key must not be empty",
c.cluster.Name)
return nil, err
}
// These are certs being mounted from within the pod and specified in Destination Rules.
// Rather than reading directly in Envoy, which does not support rotation, we will
// serve them over SDS by reading the files.
res := security.SdsCertificateConfig{
CertificatePath: tls.ClientCertificate,
PrivateKeyPath: tls.PrivateKey,
CaCertificatePath: tls.CaCertificates,
}
tlsContext.CommonTlsContext.TlsCertificateSdsSecretConfigs = append(tlsContext.CommonTlsContext.TlsCertificateSdsSecretConfigs,
authn_model.ConstructSdsSecretConfig(res.GetResourceName()))
// If tls.CaCertificate or CaCertificate in Metadata isn't configured don't set up RootSdsSecretConfig
if !res.IsRootCertificate() {
tlsContext.CommonTlsContext.ValidationContextType = &auth.CommonTlsContext_ValidationContext{}
} else {
tlsContext.CommonTlsContext.ValidationContextType = &auth.CommonTlsContext_CombinedValidationContext{
CombinedValidationContext: &auth.CommonTlsContext_CombinedCertificateValidationContext{
DefaultValidationContext: &auth.CertificateValidationContext{MatchSubjectAltNames: util.StringToExactMatch(tls.SubjectAltNames)},
ValidationContextSdsSecretConfig: authn_model.ConstructSdsSecretConfig(res.GetRootResourceName()),
},
}
}
}
if cb.IsHttp2Cluster(c) {
// This is HTTP/2 cluster, advertise it with ALPN.
tlsContext.CommonTlsContext.AlpnProtocols = util.ALPNH2Only
}
}
return tlsContext, nil
}
// Set auto_sni if EnableAutoSni feature flag is enabled and if sni field is not explicitly set in DR.
// Set auto_san_validation if VerifyCertAtClient feature flag is enabled and if there is no explicit SubjectAltNames specified in DR.
func (cb *ClusterBuilder) setAutoSniAndAutoSanValidation(mc *MutableCluster, tls *networking.ClientTLSSettings) {
if mc == nil || !features.EnableAutoSni {
return
}
setAutoSni := false
setAutoSanValidation := false
if len(tls.Sni) == 0 {
setAutoSni = true
}
if features.VerifyCertAtClient && len(tls.SubjectAltNames) == 0 {
setAutoSanValidation = true
}
if setAutoSni || setAutoSanValidation {
if mc.httpProtocolOptions == nil {
mc.httpProtocolOptions = &http.HttpProtocolOptions{}
}
if mc.httpProtocolOptions.UpstreamHttpProtocolOptions == nil {
mc.httpProtocolOptions.UpstreamHttpProtocolOptions = &core.UpstreamHttpProtocolOptions{}
}
if setAutoSni {
mc.httpProtocolOptions.UpstreamHttpProtocolOptions.AutoSni = true
}
if setAutoSanValidation {
mc.httpProtocolOptions.UpstreamHttpProtocolOptions.AutoSanValidation = true
}
}
}
func (cb *ClusterBuilder) setUseDownstreamProtocol(mc *MutableCluster) {
if mc.httpProtocolOptions == nil {
mc.httpProtocolOptions = &http.HttpProtocolOptions{}
}
options := mc.httpProtocolOptions
options.UpstreamProtocolOptions = &http.HttpProtocolOptions_UseDownstreamProtocolConfig{
UseDownstreamProtocolConfig: &http.HttpProtocolOptions_UseDownstreamHttpConfig{
HttpProtocolOptions: &core.Http1ProtocolOptions{},
Http2ProtocolOptions: http2ProtocolOptions(),
},
}
}
func http2ProtocolOptions() *core.Http2ProtocolOptions {
return &core.Http2ProtocolOptions{
// Envoy default value of 100 is too low for data path.
MaxConcurrentStreams: &wrappers.UInt32Value{
Value: 1073741824,
},
}
}
// nolint
func (cb *ClusterBuilder) IsHttp2Cluster(mc *MutableCluster) bool {
options := mc.httpProtocolOptions
return options != nil && options.GetExplicitHttpConfig().GetHttp2ProtocolOptions() != nil
}
func (cb *ClusterBuilder) setUpstreamProtocol(mc *MutableCluster, port *model.Port, direction model.TrafficDirection) {
if port.Protocol.IsHTTP2() {
cb.setH2Options(mc)
return
}
// Add use_downstream_protocol for sidecar proxy only if protocol sniffing is enabled. Since
// protocol detection is disabled for gateway and use_downstream_protocol is used under protocol
// detection for cluster to select upstream connection protocol when the service port is unnamed.
// use_downstream_protocol should be disabled for gateway; while it sort of makes sense there, even
// without sniffing, a concern is that clients will do ALPN negotiation, and we always advertise
// h2. Clients would then connect with h2, while the upstream may not support it. This is not a
// concern for plaintext, but we do not have a way to distinguish https vs http here. If users of
// gateway want this behavior, they can configure UseClientProtocol explicitly.
if cb.sidecarProxy() && ((util.IsProtocolSniffingEnabledForInboundPort(port) && direction == model.TrafficDirectionInbound) ||
(util.IsProtocolSniffingEnabledForOutboundPort(port) && direction == model.TrafficDirectionOutbound)) {
// Use downstream protocol. If the incoming traffic use HTTP 1.1, the
// upstream cluster will use HTTP 1.1, if incoming traffic use HTTP2,
// the upstream cluster will use HTTP2.
cb.setUseDownstreamProtocol(mc)
}
}
// normalizeClusters normalizes clusters to avoid duplicate clusters. This should be called
// at the end before adding the cluster to list of clusters.
func (cb *ClusterBuilder) normalizeClusters(clusters []*discovery.Resource) []*discovery.Resource {
// resolve cluster name conflicts. there can be duplicate cluster names if there are conflicting service definitions.
// for any clusters that share the same name the first cluster is kept and the others are discarded.
have := sets.Set{}
out := make([]*discovery.Resource, 0, len(clusters))
for _, c := range clusters {
if !have.Contains(c.Name) {
out = append(out, c)
} else {
cb.req.Push.AddMetric(model.DuplicatedClusters, c.Name, cb.proxyID,
fmt.Sprintf("Duplicate cluster %s found while pushing CDS", c.Name))
}
have.Insert(c.Name)
}
return out
}
// getAllCachedSubsetClusters either fetches all cached clusters for a given key (there may be multiple due to subsets)
// and returns them along with allFound=True, or returns allFound=False indicating a cache miss. In either case,
// the cache tokens are returned to allow future writes to the cache.
// This code will only trigger a cache hit if all subset clusters are present. This simplifies the code a bit,
// as the non-subset and subset cluster generation are tightly coupled, in exchange for a likely trivial cache hit rate impact.
func (cb *ClusterBuilder) getAllCachedSubsetClusters(clusterKey clusterCache) ([]*discovery.Resource, bool) {
if !features.EnableCDSCaching {
return nil, false
}
destinationRule := CastDestinationRule(clusterKey.destinationRule)
res := make([]*discovery.Resource, 0, 1+len(destinationRule.GetSubsets()))
cachedCluster, f := cb.cache.Get(&clusterKey)
allFound := f
res = append(res, cachedCluster)
dir, _, host, port := model.ParseSubsetKey(clusterKey.clusterName)
for _, ss := range destinationRule.GetSubsets() {
clusterKey.clusterName = model.BuildSubsetKey(dir, ss.Name, host, port)
cachedCluster, f := cb.cache.Get(&clusterKey)
if !f {
allFound = false
}
res = append(res, cachedCluster)
}
return res, allFound
}
// build does any final build operations needed, like marshaling etc.
func (mc *MutableCluster) build() *cluster.Cluster {
if mc == nil {
return nil
}
// Marshall Http Protocol options if they exist.
if mc.httpProtocolOptions != nil {
// UpstreamProtocolOptions is required field in Envoy. If we have not set this option earlier
// we need to set it to default http protocol options.
if mc.httpProtocolOptions.UpstreamProtocolOptions == nil {
mc.httpProtocolOptions.UpstreamProtocolOptions = &http.HttpProtocolOptions_ExplicitHttpConfig_{
ExplicitHttpConfig: &http.HttpProtocolOptions_ExplicitHttpConfig{
ProtocolConfig: &http.HttpProtocolOptions_ExplicitHttpConfig_HttpProtocolOptions{},
},
}
}
mc.cluster.TypedExtensionProtocolOptions = map[string]*any.Any{
v3.HttpProtocolOptionsType: util.MessageToAny(mc.httpProtocolOptions),
}
}
return mc.cluster
}
// CastDestinationRule returns the destination rule enclosed by the config, if not null.
// Otherwise, return nil.
func CastDestinationRule(config *config.Config) *networking.DestinationRule {
if config != nil {
return config.Spec.(*networking.DestinationRule)
}
return nil
}
// maybeApplyEdsConfig applies EdsClusterConfig on the passed in cluster if it is an EDS type of cluster.
func maybeApplyEdsConfig(c *cluster.Cluster) {
if c.GetType() != cluster.Cluster_EDS {
return
}
c.EdsClusterConfig = &cluster.Cluster_EdsClusterConfig{
ServiceName: c.Name,
EdsConfig: &core.ConfigSource{
ConfigSourceSpecifier: &core.ConfigSource_Ads{
Ads: &core.AggregatedConfigSource{},
},
InitialFetchTimeout: durationpb.New(0),
ResourceApiVersion: core.ApiVersion_V3,
},
}
}
func defaultUpstreamCommonTLSContext() *auth.CommonTlsContext {
return &auth.CommonTlsContext{
TlsParams: &auth.TlsParameters{
// if not specified, envoy use TLSv1_2 as default for client.
TlsMaximumProtocolVersion: auth.TlsParameters_TLSv1_3,
TlsMinimumProtocolVersion: auth.TlsParameters_TLSv1_2,
},
}
}
// defaultTransportSocketMatch applies to endpoints that have no security.istio.io/tlsMode label
// or those whose label value does not match "istio"
func defaultTransportSocketMatch() *cluster.Cluster_TransportSocketMatch {
return &cluster.Cluster_TransportSocketMatch{
Name: "tlsMode-disabled",
Match: &structpb.Struct{},
TransportSocket: xdsfilters.RawBufferTransportSocket,
}
}