blob: a011276cf4dc9dfd57516ee7a953e152bb653d57 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package xds
import (
"crypto/md5"
"encoding/hex"
"sort"
"strconv"
)
import (
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
"google.golang.org/protobuf/proto"
wrappers "google.golang.org/protobuf/types/known/wrapperspb"
networkingapi "istio.io/api/networking/v1alpha3"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/util"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/security/authn/factory"
"github.com/apache/dubbo-go-pixiu/pkg/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/config/host"
"github.com/apache/dubbo-go-pixiu/pkg/config/labels"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk"
"github.com/apache/dubbo-go-pixiu/pkg/network"
)
// Return the tunnel type for this endpoint builder. If the endpoint builder builds h2tunnel, the final endpoint
// collection includes only the endpoints which support H2 tunnel and the non-tunnel endpoints. The latter case is to
// support multi-cluster service.
// Revisit non-tunnel endpoint decision once the gateways supports tunnel.
// TODO(lambdai): Propose to istio api.
func GetTunnelBuilderType(_ string, proxy *model.Proxy, _ *model.PushContext) networking.TunnelType {
if proxy == nil || proxy.Metadata == nil || proxy.Metadata.ProxyConfig == nil {
return networking.NoTunnel
}
if outTunnel, ok := proxy.Metadata.ProxyConfig.ProxyMetadata["tunnel"]; ok {
switch outTunnel {
case networking.H2TunnelTypeName:
return networking.H2Tunnel
default:
// passthrough
}
}
return networking.NoTunnel
}
type EndpointBuilder struct {
// These fields define the primary key for an endpoint, and can be used as a cache key
clusterName string
network network.ID
proxyView model.ProxyView
clusterID cluster.ID
locality *core.Locality
destinationRule *config.Config
service *model.Service
clusterLocal bool
tunnelType networking.TunnelType
// These fields are provided for convenience only
subsetName string
hostname host.Name
port int
push *model.PushContext
proxy *model.Proxy
mtlsChecker *mtlsChecker
}
func NewEndpointBuilder(clusterName string, proxy *model.Proxy, push *model.PushContext) EndpointBuilder {
_, subsetName, hostname, port := model.ParseSubsetKey(clusterName)
svc := push.ServiceForHostname(proxy, hostname)
var dr *config.Config
if svc != nil {
dr = proxy.SidecarScope.DestinationRule(model.TrafficDirectionOutbound, proxy, svc.Hostname)
}
b := EndpointBuilder{
clusterName: clusterName,
network: proxy.Metadata.Network,
proxyView: proxy.GetView(),
clusterID: proxy.Metadata.ClusterID,
locality: proxy.Locality,
service: svc,
clusterLocal: push.IsClusterLocal(svc),
destinationRule: dr,
tunnelType: GetTunnelBuilderType(clusterName, proxy, push),
push: push,
proxy: proxy,
subsetName: subsetName,
hostname: hostname,
port: port,
}
// We need this for multi-network, or for clusters meant for use with AUTO_PASSTHROUGH.
if features.EnableAutomTLSCheckPolicies ||
b.push.NetworkManager().IsMultiNetworkEnabled() || model.IsDNSSrvSubsetKey(clusterName) {
b.mtlsChecker = newMtlsChecker(push, port, dr)
}
return b
}
func (b EndpointBuilder) DestinationRule() *networkingapi.DestinationRule {
if b.destinationRule == nil {
return nil
}
return b.destinationRule.Spec.(*networkingapi.DestinationRule)
}
// Key provides the eds cache key and should include any information that could change the way endpoints are generated.
func (b EndpointBuilder) Key() string {
params := []string{
b.clusterName,
string(b.network),
string(b.clusterID),
strconv.FormatBool(b.clusterLocal),
util.LocalityToString(b.locality),
b.tunnelType.ToString(),
}
if b.push != nil && b.push.AuthnPolicies != nil {
params = append(params, b.push.AuthnPolicies.GetVersion())
}
if b.destinationRule != nil {
params = append(params, b.destinationRule.Name+"/"+b.destinationRule.Namespace)
}
if b.service != nil {
params = append(params, string(b.service.Hostname)+"/"+b.service.Attributes.Namespace)
}
if b.proxyView != nil {
params = append(params, b.proxyView.String())
}
hash := md5.New()
for _, param := range params {
hash.Write([]byte(param))
}
sum := hash.Sum(nil)
return hex.EncodeToString(sum)
}
func (b EndpointBuilder) Cacheable() bool {
// If service is not defined, we cannot do any caching as we will not have a way to
// invalidate the results.
// Service being nil means the EDS will be empty anyways, so not much lost here.
return b.service != nil
}
func (b EndpointBuilder) DependentConfigs() []model.ConfigKey {
configs := []model.ConfigKey{}
if b.destinationRule != nil {
configs = append(configs, model.ConfigKey{Kind: gvk.DestinationRule, Name: b.destinationRule.Name, Namespace: b.destinationRule.Namespace})
}
if b.service != nil {
configs = append(configs, model.ConfigKey{Kind: gvk.ServiceEntry, Name: string(b.service.Hostname), Namespace: b.service.Attributes.Namespace})
}
return configs
}
var edsDependentTypes = []config.GroupVersionKind{gvk.PeerAuthentication}
func (b EndpointBuilder) DependentTypes() []config.GroupVersionKind {
return edsDependentTypes
}
// TODO(lambdai): Receive port value(15009 by default), builder to cover wide cases.
type EndpointTunnelApplier interface {
// Mutate LbEndpoint in place. Return non-nil on failure.
ApplyTunnel(lep *endpoint.LbEndpoint, tunnelType networking.TunnelType) (*endpoint.LbEndpoint, error)
}
type EndpointNoTunnelApplier struct{}
// Note that this will not return error if another tunnel typs requested.
func (t *EndpointNoTunnelApplier) ApplyTunnel(lep *endpoint.LbEndpoint, _ networking.TunnelType) (*endpoint.LbEndpoint, error) {
return lep, nil
}
type EndpointH2TunnelApplier struct{}
// TODO(lambdai): Set original port if the default cluster original port is not the same.
func (t *EndpointH2TunnelApplier) ApplyTunnel(lep *endpoint.LbEndpoint, tunnelType networking.TunnelType) (*endpoint.LbEndpoint, error) {
switch tunnelType {
case networking.H2Tunnel:
if ep := lep.GetEndpoint(); ep != nil {
if ep.Address.GetSocketAddress().GetPortValue() != 0 {
newEp := proto.Clone(lep).(*endpoint.LbEndpoint)
newEp.GetEndpoint().Address.GetSocketAddress().PortSpecifier = &core.SocketAddress_PortValue{
PortValue: 15009,
}
return newEp, nil
}
}
return lep, nil
case networking.NoTunnel:
return lep, nil
default:
panic("supported tunnel type")
}
}
type LocLbEndpointsAndOptions struct {
istioEndpoints []*model.IstioEndpoint
// The protobuf message which contains LbEndpoint slice.
llbEndpoints endpoint.LocalityLbEndpoints
// The runtime information of the LbEndpoint slice. Each LbEndpoint has individual metadata at the same index.
tunnelMetadata []EndpointTunnelApplier
}
// Return prefer H2 tunnel metadata.
func MakeTunnelApplier(_ *endpoint.LbEndpoint, tunnelOpt networking.TunnelAbility) EndpointTunnelApplier {
if tunnelOpt.SupportH2Tunnel() {
return &EndpointH2TunnelApplier{}
}
return &EndpointNoTunnelApplier{}
}
func (e *LocLbEndpointsAndOptions) append(ep *model.IstioEndpoint, le *endpoint.LbEndpoint, tunnelOpt networking.TunnelAbility) {
e.istioEndpoints = append(e.istioEndpoints, ep)
e.llbEndpoints.LbEndpoints = append(e.llbEndpoints.LbEndpoints, le)
e.tunnelMetadata = append(e.tunnelMetadata, MakeTunnelApplier(le, tunnelOpt))
}
func (e *LocLbEndpointsAndOptions) refreshWeight() {
var weight *wrappers.UInt32Value
if len(e.llbEndpoints.LbEndpoints) == 0 {
weight = nil
} else {
weight = &wrappers.UInt32Value{}
for _, lbEp := range e.llbEndpoints.LbEndpoints {
weight.Value += lbEp.GetLoadBalancingWeight().Value
}
}
e.llbEndpoints.LoadBalancingWeight = weight
}
func (e *LocLbEndpointsAndOptions) AssertInvarianceInTest() {
if len(e.llbEndpoints.LbEndpoints) != len(e.tunnelMetadata) {
panic(" len(e.llbEndpoints.LbEndpoints) != len(e.tunnelMetadata)")
}
}
// build LocalityLbEndpoints for a cluster from existing EndpointShards.
func (b *EndpointBuilder) buildLocalityLbEndpointsFromShards(
shards *model.EndpointShards,
svcPort *model.Port,
) []*LocLbEndpointsAndOptions {
localityEpMap := make(map[string]*LocLbEndpointsAndOptions)
// get the subset labels
epLabels := getSubSetLabels(b.DestinationRule(), b.subsetName)
// Determine whether or not the target service is considered local to the cluster
// and should, therefore, not be accessed from outside the cluster.
isClusterLocal := b.clusterLocal
shards.Lock()
// Extract shard keys so we can iterate in order. This ensures a stable EDS output.
keys := shards.Keys()
// The shards are updated independently, now need to filter and merge for this cluster
for _, shardKey := range keys {
endpoints := shards.Shards[shardKey]
// If the downstream service is configured as cluster-local, only include endpoints that
// reside in the same cluster.
if isClusterLocal && (shardKey.Cluster != b.clusterID) {
continue
}
for _, ep := range endpoints {
// TODO(nmittler): Consider merging discoverability policy with cluster-local
if !ep.IsDiscoverableFromProxy(b.proxy) {
continue
}
if svcPort.Name != ep.ServicePortName {
continue
}
// Port labels
if !epLabels.SubsetOf(ep.Labels) {
continue
}
locLbEps, found := localityEpMap[ep.Locality.Label]
if !found {
locLbEps = &LocLbEndpointsAndOptions{
llbEndpoints: endpoint.LocalityLbEndpoints{
Locality: util.ConvertLocality(ep.Locality.Label),
LbEndpoints: make([]*endpoint.LbEndpoint, 0, len(endpoints)),
},
tunnelMetadata: make([]EndpointTunnelApplier, 0, len(endpoints)),
}
localityEpMap[ep.Locality.Label] = locLbEps
}
if ep.EnvoyEndpoint == nil {
ep.EnvoyEndpoint = buildEnvoyLbEndpoint(ep)
}
// detect if mTLS is possible for this endpoint, used later during ep filtering
// this must be done while converting IstioEndpoints because we still have workload labels
if b.mtlsChecker != nil {
b.mtlsChecker.computeForEndpoint(ep)
if features.EnableAutomTLSCheckPolicies {
tlsMode := ep.TLSMode
if b.mtlsChecker.isMtlsDisabled(ep.EnvoyEndpoint) {
tlsMode = ""
}
if nep, modified := util.MaybeApplyTLSModeLabel(ep.EnvoyEndpoint, tlsMode); modified {
ep.EnvoyEndpoint = nep
}
}
}
locLbEps.append(ep, ep.EnvoyEndpoint, ep.TunnelAbility)
}
}
shards.Unlock()
locEps := make([]*LocLbEndpointsAndOptions, 0, len(localityEpMap))
locs := make([]string, 0, len(localityEpMap))
for k := range localityEpMap {
locs = append(locs, k)
}
if len(locs) >= 2 {
sort.Strings(locs)
}
for _, k := range locs {
locLbEps := localityEpMap[k]
var weight uint32
for _, ep := range locLbEps.llbEndpoints.LbEndpoints {
weight += ep.LoadBalancingWeight.GetValue()
}
locLbEps.llbEndpoints.LoadBalancingWeight = &wrappers.UInt32Value{
Value: weight,
}
locEps = append(locEps, locLbEps)
}
if len(locEps) == 0 {
b.push.AddMetric(model.ProxyStatusClusterNoInstances, b.clusterName, "", "")
}
return locEps
}
// TODO(lambdai): Handle ApplyTunnel error return value by filter out the failed endpoint.
func (b *EndpointBuilder) ApplyTunnelSetting(llbOpts []*LocLbEndpointsAndOptions, tunnelType networking.TunnelType) []*LocLbEndpointsAndOptions {
for _, llb := range llbOpts {
for i, ep := range llb.llbEndpoints.LbEndpoints {
newEp, err := llb.tunnelMetadata[i].ApplyTunnel(ep, tunnelType)
if err != nil {
panic("not implemented yet on failing to apply tunnel")
} else {
llb.llbEndpoints.LbEndpoints[i] = newEp
}
}
}
return llbOpts
}
// Create the CLusterLoadAssignment. At this moment the options must have been applied to the locality lb endpoints.
func (b *EndpointBuilder) createClusterLoadAssignment(llbOpts []*LocLbEndpointsAndOptions) *endpoint.ClusterLoadAssignment {
llbEndpoints := make([]*endpoint.LocalityLbEndpoints, 0, len(llbOpts))
for _, l := range llbOpts {
llbEndpoints = append(llbEndpoints, &l.llbEndpoints)
}
return &endpoint.ClusterLoadAssignment{
ClusterName: b.clusterName,
Endpoints: llbEndpoints,
}
}
// buildEnvoyLbEndpoint packs the endpoint based on istio info.
func buildEnvoyLbEndpoint(e *model.IstioEndpoint) *endpoint.LbEndpoint {
addr := util.BuildAddress(e.Address, e.EndpointPort)
healthStatus := core.HealthStatus_HEALTHY
if e.HealthStatus == model.UnHealthy {
healthStatus = core.HealthStatus_UNHEALTHY
}
ep := &endpoint.LbEndpoint{
HealthStatus: healthStatus,
LoadBalancingWeight: &wrappers.UInt32Value{
Value: e.GetLoadBalancingWeight(),
},
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: addr,
},
},
}
// Istio telemetry depends on the metadata value being set for endpoints in the mesh.
// Istio endpoint level tls transport socket configuration depends on this logic
// Do not remove pilot/pkg/xds/fake.go
ep.Metadata = util.BuildLbEndpointMetadata(e.Network, e.TLSMode, e.WorkloadName, e.Namespace, e.Locality.ClusterID, e.Labels)
return ep
}
// TODO this logic is probably done elsewhere in XDS, possible code-reuse + perf improvements
type mtlsChecker struct {
push *model.PushContext
svcPort int
destinationRule *networkingapi.DestinationRule
// cache of host identifiers that have mTLS disabled
mtlsDisabledHosts map[string]struct{}
// cache of labels/port that have mTLS disabled by peerAuthn
peerAuthDisabledMTLS map[string]bool
// cache of labels that have mTLS modes set for subset policies
subsetPolicyMode map[string]*networkingapi.ClientTLSSettings_TLSmode
// the tlsMode of the root traffic policy if it's set
rootPolicyMode *networkingapi.ClientTLSSettings_TLSmode
}
func newMtlsChecker(push *model.PushContext, svcPort int, dr *config.Config) *mtlsChecker {
var drSpec *networkingapi.DestinationRule
if dr != nil {
drSpec = dr.Spec.(*networkingapi.DestinationRule)
}
return &mtlsChecker{
push: push,
svcPort: svcPort,
destinationRule: drSpec,
mtlsDisabledHosts: map[string]struct{}{},
peerAuthDisabledMTLS: map[string]bool{},
subsetPolicyMode: map[string]*networkingapi.ClientTLSSettings_TLSmode{},
rootPolicyMode: mtlsModeForDefaultTrafficPolicy(dr, svcPort),
}
}
// mTLSDisabled returns true if the given lbEp has mTLS disabled due to any of:
// - disabled tlsMode
// - DestinationRule disabling mTLS on the entire host or the port
// - PeerAuthentication disabling mTLS at any applicable level (mesh, ns, workload, port)
func (c *mtlsChecker) isMtlsDisabled(lbEp *endpoint.LbEndpoint) bool {
if c == nil {
return false
}
_, ok := c.mtlsDisabledHosts[lbEpKey(lbEp)]
return ok
}
// computeForEndpoint checks destination rule, peer authentication and tls mode labels to determine if mTLS was turned off.
func (c *mtlsChecker) computeForEndpoint(ep *model.IstioEndpoint) {
if drMode := c.mtlsModeForDestinationRule(ep); drMode != nil {
switch *drMode {
case networkingapi.ClientTLSSettings_DISABLE:
c.mtlsDisabledHosts[lbEpKey(ep.EnvoyEndpoint)] = struct{}{}
return
case networkingapi.ClientTLSSettings_ISTIO_MUTUAL:
// don't mark this EP disabled, even if PA or tlsMode meta mark disabled
return
}
}
// if endpoint has no sidecar or explicitly tls disabled by "security.istio.io/tlsMode" label.
if ep.TLSMode != model.IstioMutualTLSModeLabel {
c.mtlsDisabledHosts[lbEpKey(ep.EnvoyEndpoint)] = struct{}{}
return
}
mtlsDisabledByPeerAuthentication := func(ep *model.IstioEndpoint) bool {
// apply any matching peer authentications
peerAuthnKey := ep.Labels.String() + ":" + strconv.Itoa(int(ep.EndpointPort))
if value, ok := c.peerAuthDisabledMTLS[peerAuthnKey]; ok {
// avoid recomputing since most EPs will have the same labels/port
return value
}
c.peerAuthDisabledMTLS[peerAuthnKey] = factory.
NewPolicyApplier(c.push, ep.Namespace, ep.Labels).
GetMutualTLSModeForPort(ep.EndpointPort) == model.MTLSDisable
return c.peerAuthDisabledMTLS[peerAuthnKey]
}
// mtls disabled by PeerAuthentication
if mtlsDisabledByPeerAuthentication(ep) {
c.mtlsDisabledHosts[lbEpKey(ep.EnvoyEndpoint)] = struct{}{}
}
}
func (c *mtlsChecker) mtlsModeForDestinationRule(ep *model.IstioEndpoint) *networkingapi.ClientTLSSettings_TLSmode {
if c.destinationRule == nil || len(c.destinationRule.Subsets) == 0 {
return c.rootPolicyMode
}
drSubsetKey := ep.Labels.String()
if value, ok := c.subsetPolicyMode[drSubsetKey]; ok {
// avoid recomputing since most EPs will have the same labels/port
return value
}
subsetValue := c.rootPolicyMode
for _, subset := range c.destinationRule.Subsets {
if labels.Instance(subset.Labels).SubsetOf(ep.Labels) {
mode := trafficPolicyTLSModeForPort(subset.TrafficPolicy, c.svcPort)
if mode != nil {
subsetValue = mode
}
break
}
}
c.subsetPolicyMode[drSubsetKey] = subsetValue
return subsetValue
}
// mtlsModeForDefaultTrafficPolicy returns true if the default traffic policy on a given dr disables mTLS
func mtlsModeForDefaultTrafficPolicy(destinationRule *config.Config, port int) *networkingapi.ClientTLSSettings_TLSmode {
if destinationRule == nil {
return nil
}
dr, ok := destinationRule.Spec.(*networkingapi.DestinationRule)
if !ok || dr == nil {
return nil
}
return trafficPolicyTLSModeForPort(dr.GetTrafficPolicy(), port)
}
func trafficPolicyTLSModeForPort(tp *networkingapi.TrafficPolicy, port int) *networkingapi.ClientTLSSettings_TLSmode {
if tp == nil {
return nil
}
var mode *networkingapi.ClientTLSSettings_TLSmode
if tp.Tls != nil {
mode = &tp.Tls.Mode
}
// if there is a port-level setting matching this cluster
for _, portSettings := range tp.GetPortLevelSettings() {
if int(portSettings.Port.Number) == port && portSettings.Tls != nil {
mode = &portSettings.Tls.Mode
break
}
}
return mode
}
func lbEpKey(b *endpoint.LbEndpoint) string {
if addr := b.GetEndpoint().GetAddress().GetSocketAddress(); addr != nil {
return addr.Address + ":" + strconv.Itoa(int(addr.GetPortValue()))
}
if addr := b.GetEndpoint().GetAddress().GetPipe(); addr != nil {
return addr.GetPath() + ":" + strconv.Itoa(int(addr.GetMode()))
}
return ""
}