blob: 1ca20456bc551cb7a4cc078c2078fbf479fb6dcb [file] [log] [blame]
// Copyright Istio Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
import (
import (
// NetworkGateway is the gateway of a network
type NetworkGateway struct {
// Network is the ID of the network where this Gateway resides.
Network network.ID
// Cluster is the ID of the k8s cluster where this Gateway resides.
Cluster cluster.ID
// gateway ip address
Addr string
// gateway port
Port uint32
type NetworkGatewaysWatcher interface {
NetworkGateways() []NetworkGateway
AppendNetworkGatewayHandler(h func())
// NetworkGatewaysHandler can be embedded to easily implement NetworkGatewaysWatcher.
type NetworkGatewaysHandler struct {
handlers []func()
func (ngh *NetworkGatewaysHandler) AppendNetworkGatewayHandler(h func()) {
ngh.handlers = append(ngh.handlers, h)
func (ngh *NetworkGatewaysHandler) NotifyGatewayHandlers() {
for _, handler := range ngh.handlers {
// NewNetworkManager creates a new NetworkManager from the Environment by merging
// together the MeshNetworks and ServiceRegistry-specific gateways.
func NewNetworkManager(env *Environment, xdsUpdater XDSUpdater) (*NetworkManager, error) {
nameCache, err := newNetworkGatewayNameCache()
if err != nil {
return nil, err
mgr := &NetworkManager{env: env, NameCache: nameCache, xdsUpdater: xdsUpdater}
return mgr, nil
func (mgr *NetworkManager) reloadAndPush() {
oldGateways := make(NetworkGatewaySet)
for _, gateway := range mgr.allGateways() {
changed := !mgr.reload().Equals(oldGateways)
if changed && mgr.xdsUpdater != nil {
log.Infof("gateways changed, triggering push")
mgr.xdsUpdater.ConfigUpdate(&PushRequest{Full: true, Reason: []TriggerReason{NetworksTrigger}})
func (mgr *NetworkManager) reload() NetworkGatewaySet {
log.Infof("reloading network gateways")
// Generate a snapshot of the state of gateways by merging the contents of
// MeshNetworks and the ServiceRegistries.
// Store all gateways in a set initially to eliminate duplicates.
gatewaySet := make(NetworkGatewaySet)
// First, load gateways from the static MeshNetworks config.
meshNetworks := mgr.env.NetworksWatcher.Networks()
if meshNetworks != nil {
for nw, networkConf := range meshNetworks.Networks {
for _, gw := range networkConf.Gateways {
if gw.GetAddress() == "" {
// registryServiceName addresses will be populated via kube service registry
Cluster: "", /* TODO(nmittler): Add Cluster to the API */
Network: network.ID(nw),
Addr: gw.GetAddress(),
Port: gw.Port,
}] = struct{}{}
// Second, load registry-specific gateways.
for _, gw := range mgr.env.NetworkGateways() {
// - the internal map of label gateways - these get deleted if the service is deleted, updated if the ip changes etc.
// - the computed map from meshNetworks (triggered by reloadNetworkLookup, the ported logic from getGatewayAddresses)
gatewaySet[gw] = struct{}{}
// Now populate the maps by network and by network+cluster.
byNetwork := make(map[network.ID][]NetworkGateway)
byNetworkAndCluster := make(map[networkAndCluster][]NetworkGateway)
for gw := range gatewaySet {
byNetwork[gw.Network] = append(byNetwork[gw.Network], gw)
nc := networkAndClusterForGateway(&gw)
byNetworkAndCluster[nc] = append(byNetworkAndCluster[nc], gw)
gwNum := []int{}
// Sort the gateways in byNetwork, and also calculate the max number
// of gateways per network.
for k, gws := range byNetwork {
byNetwork[k] = SortGateways(gws)
gwNum = append(gwNum, len(gws))
// Sort the gateways in byNetworkAndCluster.
for k, gws := range byNetworkAndCluster {
byNetworkAndCluster[k] = SortGateways(gws)
gwNum = append(gwNum, len(gws))
lcmVal := 1
// calculate lcm
for _, num := range gwNum {
lcmVal = lcm(lcmVal, num)
mgr.lcm = uint32(lcmVal)
mgr.byNetwork = byNetwork
mgr.byNetworkAndCluster = byNetworkAndCluster
return gatewaySet
func (mgr *NetworkManager) resolveHostnameGateways(gatewaySet map[NetworkGateway]struct{}) {
// filter the list of gateways to resolve
hostnameGateways := map[string][]NetworkGateway{}
names := sets.New()
for gw := range gatewaySet {
if gwIP := net.ParseIP(gw.Addr); gwIP != nil {
delete(gatewaySet, gw)
if !features.ResolveHostnameGateways {
log.Warnf("Failed parsing gateway address %s from Service Registry. "+
"Set RESOLVE_HOSTNAME_GATEWAYS on istiod to enable resolving hostnames in the control plane.",
hostnameGateways[gw.Addr] = append(hostnameGateways[gw.Addr], gw)
// resolve each hostname
for host, addrs := range mgr.NameCache.Resolve(names) {
gwsForHost := hostnameGateways[host]
if len(addrs) == 0 {
log.Warnf("could not resolve hostname %q for %d gateways", host, len(gwsForHost))
// expand each resolved address into a NetworkGateway
for _, gw := range gwsForHost {
for _, resolved := range addrs {
// copy the base gateway to preserve the port/network, but update with the resolved IP
resolvedGw := gw
resolvedGw.Addr = resolved
gatewaySet[resolvedGw] = struct{}{}
// NetworkManager provides gateway details for accessing remote networks.
type NetworkManager struct {
env *Environment
// exported for test
NameCache *networkGatewayNameCache
xdsUpdater XDSUpdater
// least common multiple of gateway number of {per network, per cluster}
mu sync.RWMutex
lcm uint32
byNetwork map[network.ID][]NetworkGateway
byNetworkAndCluster map[networkAndCluster][]NetworkGateway
func (mgr *NetworkManager) IsMultiNetworkEnabled() bool {
if mgr == nil {
return false
return len(mgr.byNetwork) > 0
// GetLBWeightScaleFactor returns the least common multiple of the number of gateways per network.
func (mgr *NetworkManager) GetLBWeightScaleFactor() uint32 {
return mgr.lcm
func (mgr *NetworkManager) AllGateways() []NetworkGateway {
return mgr.allGateways()
func (mgr *NetworkManager) allGateways() []NetworkGateway {
if mgr.byNetwork == nil {
return nil
out := make([]NetworkGateway, 0)
for _, gateways := range mgr.byNetwork {
out = append(out, gateways...)
return SortGateways(out)
func (mgr *NetworkManager) GatewaysByNetwork() map[network.ID][]NetworkGateway {
if mgr.byNetwork == nil {
return nil
out := make(map[network.ID][]NetworkGateway)
for k, v := range mgr.byNetwork {
out[k] = append(make([]NetworkGateway, 0, len(v)), v...)
return out
func (mgr *NetworkManager) GatewaysForNetwork(nw network.ID) []NetworkGateway {
if mgr.byNetwork == nil {
return nil
return mgr.byNetwork[nw]
func (mgr *NetworkManager) GatewaysForNetworkAndCluster(nw network.ID, c cluster.ID) []NetworkGateway {
if mgr.byNetwork == nil {
return nil
return mgr.byNetworkAndCluster[networkAndClusterFor(nw, c)]
type networkAndCluster struct {
network network.ID
cluster cluster.ID
func networkAndClusterForGateway(g *NetworkGateway) networkAndCluster {
return networkAndClusterFor(g.Network, g.Cluster)
func networkAndClusterFor(nw network.ID, c cluster.ID) networkAndCluster {
return networkAndCluster{
network: nw,
cluster: c,
func SortGateways(gws []NetworkGateway) []NetworkGateway {
// Sort the array so that it's stable.
sort.SliceStable(gws, func(i, j int) bool {
if cmp := strings.Compare(gws[i].Addr, gws[j].Addr); cmp < 0 {
return true
return gws[i].Port < gws[j].Port
return gws
// greatest common divisor of x and y
func gcd(x, y int) int {
var tmp int
for {
tmp = x % y
if tmp > 0 {
x = y
y = tmp
} else {
return y
// least common multiple of x and y
func lcm(x, y int) int {
return x * y / gcd(x, y)
// NetworkGatewaySet is a helper to manage a set of NetworkGateway instances.
type NetworkGatewaySet map[NetworkGateway]struct{}
func (s NetworkGatewaySet) Equals(other NetworkGatewaySet) bool {
if len(s) != len(other) {
return false
// deepequal won't catch nil-map == empty map
if len(s) == 0 && len(other) == 0 {
return true
return reflect.DeepEqual(s, other)
func (s NetworkGatewaySet) Add(gw NetworkGateway) {
s[gw] = struct{}{}
func (s NetworkGatewaySet) AddAll(other NetworkGatewaySet) {
for gw := range other {
func (s NetworkGatewaySet) ToArray() []NetworkGateway {
gws := make([]NetworkGateway, 0, len(s))
for gw := range s {
gws = append(gws, gw)
// Sort the array so that it's stable.
gws = SortGateways(gws)
return gws
// MinGatewayTTL is exported for testing
var MinGatewayTTL = 30 * time.Second
type networkGatewayNameCache struct {
client *dnsClient
cache map[string]nameCacheEntry
type nameCacheEntry struct {
value []string
expiry time.Time
timer *time.Timer
func newNetworkGatewayNameCache() (*networkGatewayNameCache, error) {
c, err := newClient()
if err != nil {
return nil, err
return newNetworkGatewayNameCacheWithClient(c), nil
// newNetworkGatewayNameCacheWithClient exported for test
func newNetworkGatewayNameCacheWithClient(c *dnsClient) *networkGatewayNameCache {
return &networkGatewayNameCache{client: c, cache: map[string]nameCacheEntry{}}
// Resolve takes a list of hostnames and returns a map of names to addresses
func (n *networkGatewayNameCache) Resolve(names sets.Set) map[string][]string {
defer n.Unlock()
out := make(map[string][]string, len(names))
for name := range names {
out[name] = n.resolveFromCache(name)
return out
// cleanupWatches cancels any scheduled re-resolve for names we no longer care about
func (n *networkGatewayNameCache) cleanupWatches(names sets.Set) {
for name, entry := range n.cache {
if names.Contains(name) {
delete(n.cache, name)
func (n *networkGatewayNameCache) resolveFromCache(name string) []string {
if entry, ok := n.cache[name]; ok && entry.expiry.After(time.Now()) {
return entry.value
// ideally this will not happen more than once for each name and the cache auto-updates in the background
// even if it does, this happens on the SotW ingestion path (kube or meshnetworks changes) and not xds push path.
return n.resolveAndCache(name)
func (n *networkGatewayNameCache) resolveAndCache(name string) []string {
if entry, ok := n.cache[name]; ok {
delete(n.cache, name)
addrs, ttl := n.resolve(name)
// avoid excessive pushes due to small TTL
if ttl < MinGatewayTTL {
ttl = MinGatewayTTL
expiry := time.Now().Add(ttl)
n.cache[name] = nameCacheEntry{
value: addrs,
expiry: expiry,
// TTL expires, try to refresh TODO should this be < ttl?
timer: time.AfterFunc(ttl, n.refreshAndNotify(name)),
return addrs
// refreshAndNotify is triggered via time.AfterFunc and will recursively schedule itself that way until timer is cleaned
// up via cleanupWatches.
func (n *networkGatewayNameCache) refreshAndNotify(name string) func() {
return func() {
log.Debugf("network gateways: refreshing DNS for %s", name)
old := n.cache[name]
addrs := n.resolveAndCache(name)
if !stringSliceEqual(old.value, addrs) {
log.Debugf("network gateways: DNS for %s changed: %v -> %v", name, old.value, addrs)
// avoid import cycle
func stringSliceEqual(a, b []string) bool {
if len(a) != len(b) {
return false
for i := range a {
if a[i] != b[i] {
return false
return true
// resolve gets all the A and AAAA records for the given name
func (n *networkGatewayNameCache) resolve(name string) ([]string, time.Duration) {
// TODO figure out how to query only A + AAAA
res := n.client.Query(new(dns.Msg).SetQuestion(dns.Fqdn(name), dns.TypeANY))
if res == nil || len(res.Answer) == 0 {
return nil, 0
ttl := uint32(math.MaxUint32)
var out []string
for _, rr := range res.Answer {
switch v := rr.(type) {
case *dns.A:
out = append(out, v.A.String())
case *dns.AAAA:
// TODO may not always want ipv6t?
out = append(out, v.AAAA.String())
// not a valid record, don't inspect TTL
if nextTTL := rr.Header().Ttl; nextTTL < ttl {
ttl = nextTTL
return out, time.Duration(ttl)
// TODO share code with pkg/dns
type dnsClient struct {
resolvConfServers []string
// NetworkGatewayTestDNSServers if set will ignore resolv.conf and use the given DNS servers for tests.
var NetworkGatewayTestDNSServers []string
func newClient() (*dnsClient, error) {
servers := NetworkGatewayTestDNSServers
if len(servers) == 0 {
dnsConfig, err := dns.ClientConfigFromFile("/etc/resolv.conf")
if err != nil {
return nil, err
if dnsConfig != nil {
for _, s := range dnsConfig.Servers {
servers = append(servers, net.JoinHostPort(s, dnsConfig.Port))
// TODO take search namespaces into account
// TODO what about /etc/hosts?
c := &dnsClient{
Client: &dns.Client{
DialTimeout: 5 * time.Second,
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
c.resolvConfServers = append(c.resolvConfServers, servers...)
return c, nil
func (c *dnsClient) Query(req *dns.Msg) *dns.Msg {
var response *dns.Msg
for _, upstream := range c.resolvConfServers {
cResponse, _, err := c.Exchange(req, upstream)
if err == nil {
response = cResponse
log.Infof("upstream dns failure: %v", err)
if response == nil {
response = new(dns.Msg)
response.Rcode = dns.RcodeServerFailure
return response