blob: 835c27a5fc7e60403498ca21108b4387880d6061 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package grpcgen
import (
"fmt"
"strings"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/util/protoconv"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/networking/util"
"github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
tlsv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
networking "istio.io/api/networking/v1alpha3"
)
type clusterBuilder struct {
push *model.PushContext
node *model.Proxy
defaultClusterName string
hostname host.Name
portNum int
// may not be set
svc *model.Service
port *model.Port
filter sets.String
}
func (g *GrpcConfigGenerator) BuildClusters(node *model.Proxy, push *model.PushContext, names []string) model.Resources {
filter := newClusterFilter(names)
clusters := make([]*cluster.Cluster, 0, len(names))
for defaultClusterName, subsetFilter := range filter {
builder, err := newClusterBuilder(node, push, defaultClusterName, subsetFilter)
if err != nil {
log.Warn(err)
continue
}
clusters = append(clusters, builder.build()...)
}
resp := make(model.Resources, 0, len(clusters))
for _, c := range clusters {
resp = append(resp, &discovery.Resource{
Name: c.Name,
Resource: protoconv.MessageToAny(c),
})
}
if len(resp) == 0 && len(names) == 0 {
log.Warnf("did not generate any cds for %s; no names provided", node.ID)
}
return resp
}
func newClusterFilter(names []string) map[string]sets.String {
filter := map[string]sets.String{}
for _, name := range names {
dir, _, hn, p := model.ParseSubsetKey(name)
defaultKey := model.BuildSubsetKey(dir, "", hn, p)
sets.InsertOrNew(filter, defaultKey, name)
}
return filter
}
func newClusterBuilder(node *model.Proxy, push *model.PushContext, defaultClusterName string, filter sets.String) (*clusterBuilder, error) {
_, _, hostname, portNum := model.ParseSubsetKey(defaultClusterName)
if hostname == "" || portNum == 0 {
return nil, fmt.Errorf("failed parsing subset key: %s", defaultClusterName)
}
// try to resolve the service and port
var port *model.Port
svc := push.ServiceForHostname(node, hostname)
if svc == nil {
return nil, fmt.Errorf("cds gen for %s: did not find service for cluster %s", node.ID, defaultClusterName)
}
port, ok := svc.Ports.GetByPort(portNum)
if !ok {
return nil, fmt.Errorf("cds gen for %s: did not find port %d in service for cluster %s", node.ID, portNum, defaultClusterName)
}
return &clusterBuilder{
node: node,
push: push,
defaultClusterName: defaultClusterName,
hostname: hostname,
portNum: portNum,
filter: filter,
svc: svc,
port: port,
}, nil
}
func (b *clusterBuilder) build() []*cluster.Cluster {
var defaultCluster *cluster.Cluster
defaultRequested := b.filter == nil || b.filter.Contains(b.defaultClusterName)
// CRITICAL: Check if DestinationRule has TLS configuration before generating default cluster
// According to Istio's proxyless gRPC implementation:
// - DestinationRule with ISTIO_MUTUAL configures CLIENT-SIDE (outbound) mTLS
// - If DestinationRule specifies ISTIO_MUTUAL, we MUST generate default cluster and apply TLS
// even if it's not explicitly requested, so that clients can use mTLS when connecting
// Reference: https://istio.io/latest/blog/2021/proxyless-grpc/#enabling-mtls
var dr *networking.DestinationRule
if b.svc != nil {
dr = b.push.DestinationRuleForService(b.svc.Attributes.Namespace, b.hostname)
if dr == nil && b.svc.Hostname != b.hostname {
dr = b.push.DestinationRuleForService(b.svc.Attributes.Namespace, b.svc.Hostname)
}
}
hasTLSInDR := dr != nil && dr.TrafficPolicy != nil && dr.TrafficPolicy.Tls != nil
if hasTLSInDR {
tlsMode := dr.TrafficPolicy.Tls.Mode
tlsModeStr := dr.TrafficPolicy.Tls.Mode.String()
hasTLSInDR = (tlsMode == networking.ClientTLSSettings_ISTIO_MUTUAL || tlsModeStr == "DUBBO_MUTUAL")
}
// Generate default cluster if requested OR if DestinationRule has ISTIO_MUTUAL TLS
if defaultRequested || hasTLSInDR {
defaultCluster = b.edsCluster(b.defaultClusterName)
// CRITICAL: For gRPC proxyless, we need to set CommonLbConfig to handle endpoint health status
// Following Istio's implementation, we should include UNHEALTHY and DRAINING endpoints
// in OverrideHostStatus so that clients can use them when healthy endpoints are not available.
// This prevents "weighted-target: no targets to pick from" errors when all endpoints are unhealthy.
// The client will prioritize HEALTHY endpoints but can fall back to UNHEALTHY/DRAINING if needed.
if defaultCluster.CommonLbConfig == nil {
defaultCluster.CommonLbConfig = &cluster.Cluster_CommonLbConfig{}
}
// CRITICAL FIX: Following Istio's implementation, always include UNHEALTHY and DRAINING
// in OverrideHostStatus. This allows clients to use unhealthy endpoints when healthy ones
// are not available, preventing "weighted-target: no targets to pick from" errors.
// The client will still prioritize HEALTHY endpoints, but can fall back to others.
defaultCluster.CommonLbConfig.OverrideHostStatus = &core.HealthStatusSet{
Statuses: []core.HealthStatus{
core.HealthStatus_HEALTHY,
core.HealthStatus_UNHEALTHY,
core.HealthStatus_DRAINING,
core.HealthStatus_UNKNOWN,
core.HealthStatus_DEGRADED,
},
}
// TLS will be applied in applyDestinationRule after DestinationRule is found
if hasTLSInDR {
log.Infof("clusterBuilder.build: generated default cluster %s (required for ISTIO_MUTUAL TLS)", b.defaultClusterName)
} else {
log.Infof("clusterBuilder.build: generated default cluster %s", b.defaultClusterName)
}
}
subsetClusters, newDefaultCluster := b.applyDestinationRule(defaultCluster)
// If applyDestinationRule generated a new default cluster (because TLS was found but cluster wasn't generated in build()),
// use it instead of the original defaultCluster
if newDefaultCluster != nil {
defaultCluster = newDefaultCluster
}
out := make([]*cluster.Cluster, 0, 1+len(subsetClusters))
if defaultCluster != nil {
out = append(out, defaultCluster)
}
result := append(out, subsetClusters...)
log.Infof("clusterBuilder.build: generated %d clusters total (1 default + %d subsets) for %s",
len(result), len(subsetClusters), b.defaultClusterName)
return result
}
func (b *clusterBuilder) edsCluster(name string) *cluster.Cluster {
return &cluster.Cluster{
Name: name,
AltStatName: util.DelimitedStatsPrefix(name),
ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS},
EdsClusterConfig: &cluster.Cluster_EdsClusterConfig{
ServiceName: name,
EdsConfig: &core.ConfigSource{
ConfigSourceSpecifier: &core.ConfigSource_Ads{
Ads: &core.AggregatedConfigSource{},
},
},
},
// CRITICAL: For gRPC proxyless, we need to set LbPolicy to ROUND_ROBIN
// This is the default load balancing policy for gRPC xDS clients
LbPolicy: cluster.Cluster_ROUND_ROBIN,
}
}
func (b *clusterBuilder) applyDestinationRule(defaultCluster *cluster.Cluster) (subsetClusters []*cluster.Cluster, newDefaultCluster *cluster.Cluster) {
if b.svc == nil || b.port == nil {
log.Warnf("applyDestinationRule: service or port is nil for %s", b.defaultClusterName)
return nil, nil
}
log.Infof("applyDestinationRule: looking for DestinationRule for service %s/%s (hostname=%s, port=%d)",
b.svc.Attributes.Namespace, b.svc.Attributes.Name, b.hostname, b.portNum)
dr := b.push.DestinationRuleForService(b.svc.Attributes.Namespace, b.hostname)
if dr == nil {
// If not found with b.hostname, try with the service's FQDN hostname
if b.svc.Hostname != b.hostname {
dr = b.push.DestinationRuleForService(b.svc.Attributes.Namespace, b.svc.Hostname)
}
if dr == nil {
log.Warnf("applyDestinationRule: no DestinationRule found for %s/%s or %s", b.svc.Attributes.Namespace, b.hostname, b.svc.Hostname)
return nil, nil
}
}
// Check if DestinationRule has TLS configuration
hasTLS := dr.TrafficPolicy != nil && dr.TrafficPolicy.Tls != nil
if hasTLS {
tlsMode := dr.TrafficPolicy.Tls.Mode
tlsModeStr := dr.TrafficPolicy.Tls.Mode.String()
hasTLS = (tlsMode == networking.ClientTLSSettings_ISTIO_MUTUAL || tlsModeStr == "DUBBO_MUTUAL")
}
// If no subsets and no TLS, there's nothing to do
if len(dr.Subsets) == 0 && !hasTLS {
log.Warnf("applyDestinationRule: DestinationRule found for %s/%s but has no subsets and no TLS policy", b.svc.Attributes.Namespace, b.hostname)
return nil, nil
}
log.Infof("applyDestinationRule: found DestinationRule for %s/%s with %d subsets, defaultCluster requested=%v, hasTLS=%v",
b.svc.Attributes.Namespace, b.hostname, len(dr.Subsets), defaultCluster != nil, hasTLS)
// CRITICAL: Apply TLS to default cluster if it exists and doesn't have TransportSocket yet
// This ensures that default cluster gets TLS from the top-level TrafficPolicy in DestinationRule
// When SubsetRule sets ISTIO_MUTUAL, inbound listener enforces STRICT mTLS, so outbound must also use TLS
// NOTE: We re-check hasTLS here because firstDestinationRule might have returned a different rule
// than the one checked in build(), especially when multiple SubsetRules exist and merge failed
if defaultCluster != nil && defaultCluster.TransportSocket == nil {
// Re-check TLS in case DestinationRule was found here but not in build()
recheckTLS := dr != nil && dr.TrafficPolicy != nil && dr.TrafficPolicy.Tls != nil
if recheckTLS {
tlsMode := dr.TrafficPolicy.Tls.Mode
tlsModeStr := dr.TrafficPolicy.Tls.Mode.String()
recheckTLS = (tlsMode == networking.ClientTLSSettings_ISTIO_MUTUAL || tlsModeStr == "DUBBO_MUTUAL")
}
if hasTLS || recheckTLS {
log.Infof("applyDestinationRule: applying TLS to default cluster %s (DestinationRule has ISTIO_MUTUAL)", b.defaultClusterName)
b.applyTLSForCluster(defaultCluster, nil)
} else {
log.Debugf("applyDestinationRule: skipping TLS for default cluster %s (DestinationRule has no TrafficPolicy or TLS)", b.defaultClusterName)
}
} else if defaultCluster == nil && hasTLS {
// CRITICAL: If default cluster was not generated in build() but DestinationRule has TLS,
// we need to generate it here to ensure TLS is applied
// This can happen if build() checked the first rule (without TLS) but applyDestinationRule
// found a different rule (with TLS) via firstDestinationRule's improved logic
log.Warnf("applyDestinationRule: default cluster was not generated in build() but DestinationRule has TLS, generating it now")
defaultCluster = b.edsCluster(b.defaultClusterName)
if defaultCluster.CommonLbConfig == nil {
defaultCluster.CommonLbConfig = &cluster.Cluster_CommonLbConfig{}
}
defaultCluster.CommonLbConfig.OverrideHostStatus = &core.HealthStatusSet{
Statuses: []core.HealthStatus{
core.HealthStatus_HEALTHY,
core.HealthStatus_UNHEALTHY,
core.HealthStatus_DRAINING,
core.HealthStatus_UNKNOWN,
core.HealthStatus_DEGRADED,
},
}
log.Infof("applyDestinationRule: applying TLS to newly generated default cluster %s (DestinationRule has ISTIO_MUTUAL)", b.defaultClusterName)
b.applyTLSForCluster(defaultCluster, nil)
return nil, defaultCluster // Return the newly generated default cluster
}
var commonLbConfig *cluster.Cluster_CommonLbConfig
if defaultCluster != nil {
commonLbConfig = defaultCluster.CommonLbConfig
} else {
commonLbConfig = &cluster.Cluster_CommonLbConfig{
OverrideHostStatus: &core.HealthStatusSet{
Statuses: []core.HealthStatus{
core.HealthStatus_HEALTHY,
core.HealthStatus_UNHEALTHY,
core.HealthStatus_DRAINING,
core.HealthStatus_UNKNOWN,
core.HealthStatus_DEGRADED,
},
},
}
}
defaultClusterRequested := defaultCluster != nil
if b.filter != nil {
defaultClusterRequested = b.filter.Contains(b.defaultClusterName)
}
for _, subset := range dr.Subsets {
if subset == nil || subset.Name == "" {
continue
}
clusterName := model.BuildSubsetKey(model.TrafficDirectionOutbound, subset.Name, b.hostname, b.portNum)
// CRITICAL: Always generate subset clusters if default cluster is requested
// This is essential for RDS WeightedCluster to work correctly
shouldGenerate := true
if b.filter != nil && !b.filter.Contains(clusterName) {
// Subset cluster not explicitly requested, but generate it if default cluster was requested
shouldGenerate = defaultClusterRequested
}
if !shouldGenerate {
log.Debugf("applyDestinationRule: skipping subset cluster %s (not requested and default not requested)", clusterName)
continue
}
log.Infof("applyDestinationRule: generating subset cluster %s for subset %s", clusterName, subset.Name)
subsetCluster := b.edsCluster(clusterName)
subsetCluster.CommonLbConfig = commonLbConfig
b.applyTLSForCluster(subsetCluster, subset)
subsetClusters = append(subsetClusters, subsetCluster)
}
log.Infof("applyDestinationRule: generated %d subset clusters for %s/%s", len(subsetClusters), b.svc.Attributes.Namespace, b.hostname)
return subsetClusters, nil
}
// applyTLSForCluster attaches a gRPC-compatible TLS transport socket whenever the
// DestinationRule (or subset override) specifies ISTIO_MUTUAL/DUBBO_MUTUAL mode.
func (b *clusterBuilder) applyTLSForCluster(c *cluster.Cluster, subset *networking.Subset) {
if c == nil || b.svc == nil {
return
}
dr := b.push.DestinationRuleForService(b.svc.Attributes.Namespace, b.hostname)
if dr == nil && b.svc.Hostname != b.hostname {
// If not found with b.hostname, try with the service's FQDN hostname
dr = b.push.DestinationRuleForService(b.svc.Attributes.Namespace, b.svc.Hostname)
}
if dr == nil {
log.Warnf("applyTLSForCluster: no DestinationRule found for cluster %s (namespace=%s, hostname=%s, service hostname=%s)",
c.Name, b.svc.Attributes.Namespace, b.hostname, b.svc.Hostname)
return
}
var policy *networking.TrafficPolicy
if subset != nil && subset.TrafficPolicy != nil {
policy = subset.TrafficPolicy
log.Infof("applyTLSForCluster: using TrafficPolicy from subset %s for cluster %s", subset.Name, c.Name)
} else {
policy = dr.TrafficPolicy
if policy != nil {
log.Infof("applyTLSForCluster: using top-level TrafficPolicy for cluster %s", c.Name)
}
}
if policy == nil || policy.Tls == nil {
if policy == nil {
log.Warnf("applyTLSForCluster: no TrafficPolicy found in DestinationRule for cluster %s", c.Name)
} else {
log.Warnf("applyTLSForCluster: no TLS settings in TrafficPolicy for cluster %s", c.Name)
}
return
}
mode := policy.Tls.Mode
modeStr := policy.Tls.Mode.String()
if mode != networking.ClientTLSSettings_ISTIO_MUTUAL && modeStr != "DUBBO_MUTUAL" {
log.Debugf("applyTLSForCluster: TLS mode %v (%s) not supported for gRPC proxyless, skipping", mode, modeStr)
return
}
tlsContext := b.buildUpstreamTLSContext(c, policy.Tls)
if tlsContext == nil {
log.Warnf("applyTLSForCluster: failed to build TLS context for cluster %s", c.Name)
return
}
// Log SNI configuration for debugging
sni := tlsContext.Sni
if sni == "" {
log.Warnf("applyTLSForCluster: SNI is empty for cluster %s, this may cause TLS handshake failures", c.Name)
} else {
log.Debugf("applyTLSForCluster: using SNI=%s for cluster %s", sni, c.Name)
}
c.TransportSocket = &core.TransportSocket{
Name: "envoy.transport_sockets.tls",
ConfigType: &core.TransportSocket_TypedConfig{TypedConfig: protoconv.MessageToAny(tlsContext)},
}
log.Infof("applyTLSForCluster: applied %v TLS transport socket to cluster %s (SNI=%s)", mode, c.Name, sni)
}
// buildUpstreamTLSContext builds an UpstreamTlsContext that conforms to gRPC xDS expectations,
// reusing the common certificate-provider setup from buildCommonTLSContext.
func (b *clusterBuilder) buildUpstreamTLSContext(c *cluster.Cluster, tlsSettings *networking.ClientTLSSettings) *tlsv3.UpstreamTlsContext {
common := buildCommonTLSContext()
if common == nil {
return nil
}
tlsContext := &tlsv3.UpstreamTlsContext{
CommonTlsContext: common,
Sni: tlsSettings.GetSni(),
}
// CRITICAL: SNI must be the service hostname, not the cluster name
// Cluster name format: outbound|port|subset|hostname
// We need to extract the hostname from the cluster name or use the service hostname
if tlsContext.Sni == "" {
if b.svc != nil && b.svc.Hostname != "" {
tlsContext.Sni = string(b.svc.Hostname)
} else {
// Fallback: try to extract hostname from cluster name
// Cluster name format: outbound|port|subset|hostname
parts := strings.Split(c.Name, "|")
if len(parts) >= 4 {
tlsContext.Sni = parts[3]
} else {
// Last resort: use cluster name (not ideal but better than empty)
tlsContext.Sni = c.Name
log.Warnf("buildUpstreamTLSContext: using cluster name as SNI fallback for %s (should be service hostname)", c.Name)
}
}
}
// Proxyless gRPC always speaks HTTP/2, advertise h2 via ALPN.
tlsContext.CommonTlsContext.AlpnProtocols = []string{"h2"}
return tlsContext
}