blob: 95a2e544785c487e9f566e15c203ca44e8136047 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v1alpha3
import (
"time"
)
import (
mysql "github.com/envoyproxy/go-control-plane/contrib/envoy/extensions/filters/network/mysql_proxy/v3"
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
mongo "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/mongo_proxy/v3"
redis "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/redis_proxy/v3"
tcp "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/tcp_proxy/v3"
hashpolicy "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"google.golang.org/protobuf/types/known/durationpb"
networking "istio.io/api/networking/v1alpha3"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
istionetworking "github.com/apache/dubbo-go-pixiu/pilot/pkg/networking"
istioroute "github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/core/v1alpha3/route"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/telemetry"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/util"
xdsfilters "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/filters"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/config/host"
"github.com/apache/dubbo-go-pixiu/pkg/config/protocol"
)
// redisOpTimeout is the default operation timeout for the Redis proxy filter.
var redisOpTimeout = 5 * time.Second
func buildMetadataExchangeNetworkFilters(class istionetworking.ListenerClass) []*listener.Filter {
filterstack := make([]*listener.Filter, 0)
// We add metadata exchange on inbound only; outbound is handled in cluster filter
if class == istionetworking.ListenerClassSidecarInbound && features.MetadataExchange {
filterstack = append(filterstack, xdsfilters.TCPListenerMx)
}
return filterstack
}
func buildMetricsNetworkFilters(push *model.PushContext, proxy *model.Proxy, class istionetworking.ListenerClass) []*listener.Filter {
return push.Telemetry.TCPFilters(proxy, class)
}
// setAccessLogAndBuildTCPFilter sets the AccessLog configuration in the given
// TcpProxy instance and builds a TCP filter out of it.
func setAccessLogAndBuildTCPFilter(push *model.PushContext, node *model.Proxy, config *tcp.TcpProxy, class istionetworking.ListenerClass) *listener.Filter {
accessLogBuilder.setTCPAccessLog(push, node, config, class)
tcpFilter := &listener.Filter{
Name: wellknown.TCPProxy,
ConfigType: &listener.Filter_TypedConfig{TypedConfig: util.MessageToAny(config)},
}
return tcpFilter
}
// buildOutboundNetworkFiltersWithSingleDestination takes a single cluster name
// and builds a stack of network filters.
func buildOutboundNetworkFiltersWithSingleDestination(push *model.PushContext, node *model.Proxy,
statPrefix, clusterName, subsetName string, port *model.Port, destinationRule *networking.DestinationRule) []*listener.Filter {
tcpProxy := &tcp.TcpProxy{
StatPrefix: statPrefix,
ClusterSpecifier: &tcp.TcpProxy_Cluster{Cluster: clusterName},
}
idleTimeout, err := time.ParseDuration(node.Metadata.IdleTimeout)
if err == nil {
tcpProxy.IdleTimeout = durationpb.New(idleTimeout)
}
maybeSetHashPolicy(destinationRule, tcpProxy, subsetName)
class := model.OutboundListenerClass(node.Type)
tcpFilter := setAccessLogAndBuildTCPFilter(push, node, tcpProxy, class)
var filters []*listener.Filter
filters = append(filters, buildMetadataExchangeNetworkFilters(class)...)
filters = append(filters, buildMetricsNetworkFilters(push, node, class)...)
filters = append(filters, buildNetworkFiltersStack(port.Protocol, tcpFilter, statPrefix, clusterName)...)
return filters
}
// buildOutboundNetworkFiltersWithWeightedClusters takes a set of weighted
// destination routes and builds a stack of network filters.
func buildOutboundNetworkFiltersWithWeightedClusters(node *model.Proxy, routes []*networking.RouteDestination,
push *model.PushContext, port *model.Port, configMeta config.Meta, destinationRule *networking.DestinationRule) []*listener.Filter {
statPrefix := configMeta.Name + "." + configMeta.Namespace
clusterSpecifier := &tcp.TcpProxy_WeightedClusters{
WeightedClusters: &tcp.TcpProxy_WeightedCluster{},
}
tcpProxy := &tcp.TcpProxy{
StatPrefix: statPrefix,
ClusterSpecifier: clusterSpecifier,
}
idleTimeout, err := time.ParseDuration(node.Metadata.IdleTimeout)
if err == nil {
tcpProxy.IdleTimeout = durationpb.New(idleTimeout)
}
for _, route := range routes {
service := push.ServiceForHostname(node, host.Name(route.Destination.Host))
if route.Weight > 0 {
clusterName := istioroute.GetDestinationCluster(route.Destination, service, port.Port)
clusterSpecifier.WeightedClusters.Clusters = append(clusterSpecifier.WeightedClusters.Clusters, &tcp.TcpProxy_WeightedCluster_ClusterWeight{
Name: clusterName,
Weight: uint32(route.Weight),
})
}
}
// For weighted clusters set hash policy if any of the upstream destinations have sourceIP.
maybeSetHashPolicy(destinationRule, tcpProxy, "")
// TODO: Need to handle multiple cluster names for Redis
clusterName := clusterSpecifier.WeightedClusters.Clusters[0].Name
class := model.OutboundListenerClass(node.Type)
tcpFilter := setAccessLogAndBuildTCPFilter(push, node, tcpProxy, class)
var filters []*listener.Filter
filters = append(filters, buildMetadataExchangeNetworkFilters(class)...)
filters = append(filters, buildMetricsNetworkFilters(push, node, class)...)
filters = append(filters, buildNetworkFiltersStack(port.Protocol, tcpFilter, statPrefix, clusterName)...)
return filters
}
func maybeSetHashPolicy(destinationRule *networking.DestinationRule, tcpProxy *tcp.TcpProxy, subsetName string) {
if destinationRule != nil {
useSourceIP := destinationRule.GetTrafficPolicy().GetLoadBalancer().GetConsistentHash().GetUseSourceIp()
for _, subset := range destinationRule.Subsets {
if subset.Name != subsetName {
continue
}
// If subset has load balancer - see if it is also consistent hash source IP
if subset.TrafficPolicy != nil && subset.TrafficPolicy.LoadBalancer != nil {
if subset.TrafficPolicy.LoadBalancer.GetConsistentHash() != nil {
useSourceIP = subset.TrafficPolicy.LoadBalancer.GetConsistentHash().GetUseSourceIp()
} else {
// This means that subset has defined non sourceIP consistent hash load balancer.
useSourceIP = false
}
}
break
}
// If destinationrule has consistent hash source ip set, use it for tcp proxy.
if useSourceIP {
tcpProxy.HashPolicy = []*hashpolicy.HashPolicy{{PolicySpecifier: &hashpolicy.HashPolicy_SourceIp_{
SourceIp: &hashpolicy.HashPolicy_SourceIp{},
}}}
}
}
}
// buildNetworkFiltersStack builds a slice of network filters based on
// the protocol in use and the given TCP filter instance.
func buildNetworkFiltersStack(p protocol.Instance, tcpFilter *listener.Filter, statPrefix string, clusterName string) []*listener.Filter {
filterstack := make([]*listener.Filter, 0)
switch p {
case protocol.Mongo:
if features.EnableMongoFilter {
filterstack = append(filterstack, buildMongoFilter(statPrefix), tcpFilter)
} else {
filterstack = append(filterstack, tcpFilter)
}
case protocol.Redis:
if features.EnableRedisFilter {
// redis filter has route config, it is a terminating filter, no need append tcp filter.
filterstack = append(filterstack, buildRedisFilter(statPrefix, clusterName))
} else {
filterstack = append(filterstack, tcpFilter)
}
case protocol.MySQL:
if features.EnableMysqlFilter {
filterstack = append(filterstack, buildMySQLFilter(statPrefix))
}
filterstack = append(filterstack, tcpFilter)
default:
filterstack = append(filterstack, tcpFilter)
}
return filterstack
}
// buildOutboundNetworkFilters generates a TCP proxy network filter for outbound
// connections. In addition, it generates protocol specific filters (e.g., Mongo
// filter).
func buildOutboundNetworkFilters(node *model.Proxy,
routes []*networking.RouteDestination, push *model.PushContext,
port *model.Port, configMeta config.Meta) []*listener.Filter {
service := push.ServiceForHostname(node, host.Name(routes[0].Destination.Host))
var destinationRule *networking.DestinationRule
if service != nil {
destinationRule = CastDestinationRule(node.SidecarScope.DestinationRule(model.TrafficDirectionOutbound, node, service.Hostname))
}
if len(routes) == 1 {
clusterName := istioroute.GetDestinationCluster(routes[0].Destination, service, port.Port)
statPrefix := clusterName
// If stat name is configured, build the stat prefix from configured pattern.
if len(push.Mesh.OutboundClusterStatName) != 0 && service != nil {
statPrefix = telemetry.BuildStatPrefix(push.Mesh.OutboundClusterStatName, routes[0].Destination.Host,
routes[0].Destination.Subset, port, &service.Attributes)
}
return buildOutboundNetworkFiltersWithSingleDestination(push, node, statPrefix, clusterName, routes[0].Destination.Subset, port, destinationRule)
}
return buildOutboundNetworkFiltersWithWeightedClusters(node, routes, push, port, configMeta, destinationRule)
}
// buildMongoFilter builds an outbound Envoy MongoProxy filter.
func buildMongoFilter(statPrefix string) *listener.Filter {
// TODO: add a watcher for /var/lib/istio/mongo/certs
// if certs are found use, TLS or mTLS clusters for talking to MongoDB.
// User is responsible for mounting those certs in the pod.
mongoProxy := &mongo.MongoProxy{
StatPrefix: statPrefix, // mongo stats are prefixed with mongo.<statPrefix> by Envoy
// TODO enable faults in mongo
}
out := &listener.Filter{
Name: wellknown.MongoProxy,
ConfigType: &listener.Filter_TypedConfig{TypedConfig: util.MessageToAny(mongoProxy)},
}
return out
}
// buildOutboundAutoPassthroughFilterStack builds a filter stack with sni_cluster and tcp
// used by auto_passthrough gateway servers
func buildOutboundAutoPassthroughFilterStack(push *model.PushContext, node *model.Proxy, port *model.Port) []*listener.Filter {
// First build tcp with access logs
// then add sni_cluster to the front
tcpProxy := buildOutboundNetworkFiltersWithSingleDestination(push, node, util.BlackHoleCluster, util.BlackHoleCluster, "", port, nil)
filterstack := make([]*listener.Filter, 0)
filterstack = append(filterstack, &listener.Filter{
Name: util.SniClusterFilter,
})
filterstack = append(filterstack, tcpProxy...)
return filterstack
}
// buildRedisFilter builds an outbound Envoy RedisProxy filter.
// Currently, if multiple clusters are defined, one of them will be picked for
// configuring the Redis proxy.
func buildRedisFilter(statPrefix, clusterName string) *listener.Filter {
redisProxy := &redis.RedisProxy{
LatencyInMicros: true, // redis latency stats are captured in micro seconds which is typically the case.
StatPrefix: statPrefix, // redis stats are prefixed with redis.<statPrefix> by Envoy
Settings: &redis.RedisProxy_ConnPoolSettings{
OpTimeout: durationpb.New(redisOpTimeout),
},
PrefixRoutes: &redis.RedisProxy_PrefixRoutes{
CatchAllRoute: &redis.RedisProxy_PrefixRoutes_Route{
Cluster: clusterName,
},
},
}
out := &listener.Filter{
Name: wellknown.RedisProxy,
ConfigType: &listener.Filter_TypedConfig{TypedConfig: util.MessageToAny(redisProxy)},
}
return out
}
// buildMySQLFilter builds an outbound Envoy MySQLProxy filter.
func buildMySQLFilter(statPrefix string) *listener.Filter {
mySQLProxy := &mysql.MySQLProxy{
StatPrefix: statPrefix, // MySQL stats are prefixed with mysql.<statPrefix> by Envoy.
}
out := &listener.Filter{
Name: wellknown.MySQLProxy,
ConfigType: &listener.Filter_TypedConfig{TypedConfig: util.MessageToAny(mySQLProxy)},
}
return out
}