blob: 9dea9b678175951a1eda162a275f3ffee6d3fe1b [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcgen
import (
"fmt"
)
import (
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
tls "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
networking "istio.io/api/networking/v1alpha3"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
corexds "github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/core/v1alpha3"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/util"
"github.com/apache/dubbo-go-pixiu/pkg/config/host"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
)
// BuildClusters handles a gRPC CDS request, used with the 'ApiListener' style of requests.
// The main difference is that the request includes Resources to filter.
func (g *GrpcConfigGenerator) BuildClusters(node *model.Proxy, push *model.PushContext, names []string) model.Resources {
filter := newClusterFilter(names)
clusters := make([]*cluster.Cluster, 0, len(names))
for defaultClusterName, subsetFilter := range filter {
builder, err := newClusterBuilder(node, push, defaultClusterName, subsetFilter)
if err != nil {
log.Warn(err)
continue
}
clusters = append(clusters, builder.build()...)
}
resp := make(model.Resources, 0, len(clusters))
for _, c := range clusters {
resp = append(resp, &discovery.Resource{
Name: c.Name,
Resource: util.MessageToAny(c),
})
}
if len(resp) == 0 && len(names) == 0 {
log.Warnf("did not generate any cds for %s; no names provided", node.ID)
}
return resp
}
// newClusterFilter maps a non-subset cluster name to the list of actual cluster names (default or subset) actually
// requested by the client. gRPC will usually request a group of clusters that are used in the same route; in some
// cases this means subsets associated with the same default cluster aren't all expected in the same CDS response.
func newClusterFilter(names []string) map[string]sets.Set {
filter := map[string]sets.Set{}
for _, name := range names {
dir, _, hn, p := model.ParseSubsetKey(name)
defaultKey := model.BuildSubsetKey(dir, "", hn, p)
if _, ok := filter[defaultKey]; !ok {
filter[defaultKey] = sets.New()
}
filter[defaultKey].Insert(name)
}
return filter
}
// clusterBuilder is responsible for building a single default and subset clusters for a service
// TODO re-use the v1alpha3.ClusterBuilder:
// Most of the logic is similar, I think we can just share the code if we expose:
// * BuildSubsetCluster
// * BuildDefaultCluster
// * BuildClusterOpts and members
// * Add something to allow us to override how tlscontext is built
type clusterBuilder struct {
push *model.PushContext
node *model.Proxy
// guaranteed to be set in init
defaultClusterName string
requestedClusterName string
hostname host.Name
portNum int
// may not be set
svc *model.Service
port *model.Port
filter sets.Set
}
func newClusterBuilder(node *model.Proxy, push *model.PushContext, defaultClusterName string, filter sets.Set) (*clusterBuilder, error) {
_, _, hostname, portNum := model.ParseSubsetKey(defaultClusterName)
if hostname == "" || portNum == 0 {
return nil, fmt.Errorf("failed parsing subset key: %s", defaultClusterName)
}
// try to resolve the service and port
var port *model.Port
svc := push.ServiceForHostname(node, hostname)
if svc == nil {
log.Warnf("cds gen for %s: did not find service for cluster %s", node.ID, defaultClusterName)
} else {
var ok bool
port, ok = svc.Ports.GetByPort(portNum)
if !ok {
log.Warnf("cds gen for %s: did not find port %d in service for cluster %s", node.ID, portNum, defaultClusterName)
}
}
return &clusterBuilder{
node: node,
push: push,
defaultClusterName: defaultClusterName,
hostname: hostname,
portNum: portNum,
filter: filter,
svc: svc,
port: port,
}, nil
}
// subsetFilter returns the requestedClusterName if it isn't the default cluster
// for subset clusters, gRPC may request them individually
func (b *clusterBuilder) subsetFilter() string {
if b.defaultClusterName == b.requestedClusterName {
return ""
}
return b.requestedClusterName
}
func (b *clusterBuilder) build() []*cluster.Cluster {
var defaultCluster *cluster.Cluster
if b.filter.Contains(b.defaultClusterName) {
defaultCluster = edsCluster(b.defaultClusterName)
}
subsetClusters := b.applyDestinationRule(defaultCluster)
out := make([]*cluster.Cluster, 0, 1+len(subsetClusters))
if defaultCluster != nil {
out = append(out, defaultCluster)
}
return append(out, subsetClusters...)
}
// edsCluster creates a simple cluster to read endpoints from ads/eds.
func edsCluster(name string) *cluster.Cluster {
return &cluster.Cluster{
Name: name,
ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS},
EdsClusterConfig: &cluster.Cluster_EdsClusterConfig{
ServiceName: name,
EdsConfig: &core.ConfigSource{
ConfigSourceSpecifier: &core.ConfigSource_Ads{
Ads: &core.AggregatedConfigSource{},
},
},
},
}
}
// applyDestinationRule mutates the default cluster to reflect traffic policies, and returns a set of additional
// subset clusters if specified by a destination rule
func (b *clusterBuilder) applyDestinationRule(defaultCluster *cluster.Cluster) (subsetClusters []*cluster.Cluster) {
if b.svc == nil || b.port == nil {
return nil
}
// resolve policy from context
destinationRule := corexds.CastDestinationRule(b.node.SidecarScope.DestinationRule(
model.TrafficDirectionOutbound, b.node, b.svc.Hostname))
trafficPolicy := corexds.MergeTrafficPolicy(nil, destinationRule.GetTrafficPolicy(), b.port)
// setup default cluster
b.applyTrafficPolicy(defaultCluster, trafficPolicy)
// subset clusters
if len(destinationRule.GetSubsets()) > 0 {
subsetClusters = make([]*cluster.Cluster, 0, len(destinationRule.GetSubsets()))
for _, subset := range destinationRule.GetSubsets() {
subsetKey := subsetClusterKey(subset.Name, string(b.hostname), b.portNum)
if !b.filter.Contains(subsetKey) {
continue
}
c := edsCluster(subsetKey)
trafficPolicy := corexds.MergeTrafficPolicy(trafficPolicy, subset.TrafficPolicy, b.port)
b.applyTrafficPolicy(c, trafficPolicy)
subsetClusters = append(subsetClusters, c)
}
}
return
}
// applyTrafficPolicy mutates the give cluster (if not-nil) so that the given merged traffic policy applies.
func (b *clusterBuilder) applyTrafficPolicy(c *cluster.Cluster, trafficPolicy *networking.TrafficPolicy) {
// cluster can be nil if it wasn't requested
if c == nil {
return
}
b.applyTLS(c, trafficPolicy)
b.applyLoadBalancing(c, trafficPolicy)
// TODO status or log when unsupported features are included
}
func (b *clusterBuilder) applyLoadBalancing(c *cluster.Cluster, policy *networking.TrafficPolicy) {
switch policy.GetLoadBalancer().GetSimple() {
case networking.LoadBalancerSettings_ROUND_ROBIN, networking.LoadBalancerSettings_UNSPECIFIED:
// ok
default:
log.Warnf("cannot apply LbPolicy %s to %s", policy.LoadBalancer.GetSimple(), b.node.ID)
}
corexds.ApplyRingHashLoadBalancer(c, policy.GetLoadBalancer())
}
func (b *clusterBuilder) applyTLS(c *cluster.Cluster, policy *networking.TrafficPolicy) {
// TODO for now, we leave mTLS *off* by default:
// 1. We don't know if the client uses xds.NewClientCredentials; these settings will be ignored if not
// 2. We cannot reach servers in PERMISSIVE mode; gRPC doesn't allow us to override the alpn to one of Istio's
// 3. Once we support gRPC servers, we have no good way to detect if a server is implemented with xds.NewGrpcServer and will actually support our config
// For these reasons, support only explicit tls configuration.
switch policy.GetTls().GetMode() {
case networking.ClientTLSSettings_DISABLE:
// nothing to do
case networking.ClientTLSSettings_SIMPLE:
// TODO support this
case networking.ClientTLSSettings_MUTUAL:
// TODO support this
case networking.ClientTLSSettings_ISTIO_MUTUAL:
tlsCtx := buildUpstreamTLSContext(b.push.ServiceAccounts[b.hostname][b.portNum])
c.TransportSocket = &core.TransportSocket{
Name: transportSocketName,
ConfigType: &core.TransportSocket_TypedConfig{TypedConfig: util.MessageToAny(tlsCtx)},
}
}
}
// TransportSocket proto message has a `name` field which is expected to be set to exactly this value by the
// management server (see grpc/xds/internal/client/xds.go securityConfigFromCluster).
const transportSocketName = "envoy.transport_sockets.tls"
func buildUpstreamTLSContext(sans []string) *tls.UpstreamTlsContext {
return &tls.UpstreamTlsContext{
CommonTlsContext: buildCommonTLSContext(sans),
}
}