| // Copyright Istio Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package model |
| |
| import ( |
| "math" |
| "net" |
| "reflect" |
| "sort" |
| "strings" |
| "sync" |
| "time" |
| ) |
| |
| import ( |
| "github.com/miekg/dns" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/features" |
| "github.com/apache/dubbo-go-pixiu/pkg/cluster" |
| "github.com/apache/dubbo-go-pixiu/pkg/network" |
| "github.com/apache/dubbo-go-pixiu/pkg/util/sets" |
| ) |
| |
| // NetworkGateway is the gateway of a network |
| type NetworkGateway struct { |
| // Network is the ID of the network where this Gateway resides. |
| Network network.ID |
| // Cluster is the ID of the k8s cluster where this Gateway resides. |
| Cluster cluster.ID |
| // gateway ip address |
| Addr string |
| // gateway port |
| Port uint32 |
| } |
| |
| type NetworkGatewaysWatcher interface { |
| NetworkGateways() []NetworkGateway |
| AppendNetworkGatewayHandler(h func()) |
| } |
| |
| // NetworkGatewaysHandler can be embedded to easily implement NetworkGatewaysWatcher. |
| type NetworkGatewaysHandler struct { |
| handlers []func() |
| } |
| |
| func (ngh *NetworkGatewaysHandler) AppendNetworkGatewayHandler(h func()) { |
| ngh.handlers = append(ngh.handlers, h) |
| } |
| |
| func (ngh *NetworkGatewaysHandler) NotifyGatewayHandlers() { |
| for _, handler := range ngh.handlers { |
| handler() |
| } |
| } |
| |
| // NewNetworkManager creates a new NetworkManager from the Environment by merging |
| // together the MeshNetworks and ServiceRegistry-specific gateways. |
| func NewNetworkManager(env *Environment, xdsUpdater XDSUpdater) (*NetworkManager, error) { |
| nameCache, err := newNetworkGatewayNameCache() |
| if err != nil { |
| return nil, err |
| } |
| mgr := &NetworkManager{env: env, NameCache: nameCache, xdsUpdater: xdsUpdater} |
| env.AddNetworksHandler(mgr.reloadAndPush) |
| env.AppendNetworkGatewayHandler(mgr.reloadAndPush) |
| nameCache.AppendNetworkGatewayHandler(mgr.reloadAndPush) |
| mgr.reload() |
| return mgr, nil |
| } |
| |
| func (mgr *NetworkManager) reloadAndPush() { |
| mgr.mu.Lock() |
| defer mgr.mu.Unlock() |
| oldGateways := make(NetworkGatewaySet) |
| for _, gateway := range mgr.allGateways() { |
| oldGateways.Add(gateway) |
| } |
| changed := !mgr.reload().Equals(oldGateways) |
| |
| if changed && mgr.xdsUpdater != nil { |
| log.Infof("gateways changed, triggering push") |
| mgr.xdsUpdater.ConfigUpdate(&PushRequest{Full: true, Reason: []TriggerReason{NetworksTrigger}}) |
| } |
| } |
| |
| func (mgr *NetworkManager) reload() NetworkGatewaySet { |
| log.Infof("reloading network gateways") |
| |
| // Generate a snapshot of the state of gateways by merging the contents of |
| // MeshNetworks and the ServiceRegistries. |
| |
| // Store all gateways in a set initially to eliminate duplicates. |
| gatewaySet := make(NetworkGatewaySet) |
| |
| // First, load gateways from the static MeshNetworks config. |
| meshNetworks := mgr.env.NetworksWatcher.Networks() |
| if meshNetworks != nil { |
| for nw, networkConf := range meshNetworks.Networks { |
| for _, gw := range networkConf.Gateways { |
| if gw.GetAddress() == "" { |
| // registryServiceName addresses will be populated via kube service registry |
| continue |
| } |
| gatewaySet[NetworkGateway{ |
| Cluster: "", /* TODO(nmittler): Add Cluster to the API */ |
| Network: network.ID(nw), |
| Addr: gw.GetAddress(), |
| Port: gw.Port, |
| }] = struct{}{} |
| } |
| } |
| } |
| |
| // Second, load registry-specific gateways. |
| for _, gw := range mgr.env.NetworkGateways() { |
| // - the internal map of label gateways - these get deleted if the service is deleted, updated if the ip changes etc. |
| // - the computed map from meshNetworks (triggered by reloadNetworkLookup, the ported logic from getGatewayAddresses) |
| gatewaySet[gw] = struct{}{} |
| } |
| |
| mgr.resolveHostnameGateways(gatewaySet) |
| |
| // Now populate the maps by network and by network+cluster. |
| byNetwork := make(map[network.ID][]NetworkGateway) |
| byNetworkAndCluster := make(map[networkAndCluster][]NetworkGateway) |
| for gw := range gatewaySet { |
| byNetwork[gw.Network] = append(byNetwork[gw.Network], gw) |
| nc := networkAndClusterForGateway(&gw) |
| byNetworkAndCluster[nc] = append(byNetworkAndCluster[nc], gw) |
| } |
| |
| gwNum := []int{} |
| // Sort the gateways in byNetwork, and also calculate the max number |
| // of gateways per network. |
| for k, gws := range byNetwork { |
| byNetwork[k] = SortGateways(gws) |
| gwNum = append(gwNum, len(gws)) |
| } |
| |
| // Sort the gateways in byNetworkAndCluster. |
| for k, gws := range byNetworkAndCluster { |
| byNetworkAndCluster[k] = SortGateways(gws) |
| gwNum = append(gwNum, len(gws)) |
| } |
| |
| lcmVal := 1 |
| // calculate lcm |
| for _, num := range gwNum { |
| lcmVal = lcm(lcmVal, num) |
| } |
| |
| mgr.lcm = uint32(lcmVal) |
| mgr.byNetwork = byNetwork |
| mgr.byNetworkAndCluster = byNetworkAndCluster |
| |
| return gatewaySet |
| } |
| |
| func (mgr *NetworkManager) resolveHostnameGateways(gatewaySet map[NetworkGateway]struct{}) { |
| // filter the list of gateways to resolve |
| hostnameGateways := map[string][]NetworkGateway{} |
| names := sets.New() |
| for gw := range gatewaySet { |
| if gwIP := net.ParseIP(gw.Addr); gwIP != nil { |
| continue |
| } |
| delete(gatewaySet, gw) |
| if !features.ResolveHostnameGateways { |
| log.Warnf("Failed parsing gateway address %s from Service Registry. "+ |
| "Set RESOLVE_HOSTNAME_GATEWAYS on istiod to enable resolving hostnames in the control plane.", |
| gw.Addr) |
| continue |
| } |
| hostnameGateways[gw.Addr] = append(hostnameGateways[gw.Addr], gw) |
| names.Insert(gw.Addr) |
| } |
| |
| // resolve each hostname |
| for host, addrs := range mgr.NameCache.Resolve(names) { |
| gwsForHost := hostnameGateways[host] |
| if len(addrs) == 0 { |
| log.Warnf("could not resolve hostname %q for %d gateways", host, len(gwsForHost)) |
| } |
| // expand each resolved address into a NetworkGateway |
| for _, gw := range gwsForHost { |
| for _, resolved := range addrs { |
| // copy the base gateway to preserve the port/network, but update with the resolved IP |
| resolvedGw := gw |
| resolvedGw.Addr = resolved |
| gatewaySet[resolvedGw] = struct{}{} |
| } |
| } |
| } |
| } |
| |
| // NetworkManager provides gateway details for accessing remote networks. |
| type NetworkManager struct { |
| env *Environment |
| // exported for test |
| NameCache *networkGatewayNameCache |
| xdsUpdater XDSUpdater |
| |
| // least common multiple of gateway number of {per network, per cluster} |
| mu sync.RWMutex |
| lcm uint32 |
| byNetwork map[network.ID][]NetworkGateway |
| byNetworkAndCluster map[networkAndCluster][]NetworkGateway |
| } |
| |
| func (mgr *NetworkManager) IsMultiNetworkEnabled() bool { |
| if mgr == nil { |
| return false |
| } |
| mgr.mu.RLock() |
| defer mgr.mu.RUnlock() |
| return len(mgr.byNetwork) > 0 |
| } |
| |
| // GetLBWeightScaleFactor returns the least common multiple of the number of gateways per network. |
| func (mgr *NetworkManager) GetLBWeightScaleFactor() uint32 { |
| mgr.mu.RLock() |
| defer mgr.mu.RUnlock() |
| return mgr.lcm |
| } |
| |
| func (mgr *NetworkManager) AllGateways() []NetworkGateway { |
| mgr.mu.RLock() |
| defer mgr.mu.RUnlock() |
| return mgr.allGateways() |
| } |
| |
| func (mgr *NetworkManager) allGateways() []NetworkGateway { |
| if mgr.byNetwork == nil { |
| return nil |
| } |
| out := make([]NetworkGateway, 0) |
| for _, gateways := range mgr.byNetwork { |
| out = append(out, gateways...) |
| } |
| return SortGateways(out) |
| } |
| |
| func (mgr *NetworkManager) GatewaysByNetwork() map[network.ID][]NetworkGateway { |
| mgr.mu.RLock() |
| defer mgr.mu.RUnlock() |
| if mgr.byNetwork == nil { |
| return nil |
| } |
| out := make(map[network.ID][]NetworkGateway) |
| for k, v := range mgr.byNetwork { |
| out[k] = append(make([]NetworkGateway, 0, len(v)), v...) |
| } |
| return out |
| } |
| |
| func (mgr *NetworkManager) GatewaysForNetwork(nw network.ID) []NetworkGateway { |
| mgr.mu.RLock() |
| defer mgr.mu.RUnlock() |
| if mgr.byNetwork == nil { |
| return nil |
| } |
| return mgr.byNetwork[nw] |
| } |
| |
| func (mgr *NetworkManager) GatewaysForNetworkAndCluster(nw network.ID, c cluster.ID) []NetworkGateway { |
| mgr.mu.RLock() |
| defer mgr.mu.RUnlock() |
| if mgr.byNetwork == nil { |
| return nil |
| } |
| return mgr.byNetworkAndCluster[networkAndClusterFor(nw, c)] |
| } |
| |
| type networkAndCluster struct { |
| network network.ID |
| cluster cluster.ID |
| } |
| |
| func networkAndClusterForGateway(g *NetworkGateway) networkAndCluster { |
| return networkAndClusterFor(g.Network, g.Cluster) |
| } |
| |
| func networkAndClusterFor(nw network.ID, c cluster.ID) networkAndCluster { |
| return networkAndCluster{ |
| network: nw, |
| cluster: c, |
| } |
| } |
| |
| func SortGateways(gws []NetworkGateway) []NetworkGateway { |
| // Sort the array so that it's stable. |
| sort.SliceStable(gws, func(i, j int) bool { |
| if cmp := strings.Compare(gws[i].Addr, gws[j].Addr); cmp < 0 { |
| return true |
| } |
| return gws[i].Port < gws[j].Port |
| }) |
| return gws |
| } |
| |
| // greatest common divisor of x and y |
| func gcd(x, y int) int { |
| var tmp int |
| for { |
| tmp = x % y |
| if tmp > 0 { |
| x = y |
| y = tmp |
| } else { |
| return y |
| } |
| } |
| } |
| |
| // least common multiple of x and y |
| func lcm(x, y int) int { |
| return x * y / gcd(x, y) |
| } |
| |
| // NetworkGatewaySet is a helper to manage a set of NetworkGateway instances. |
| type NetworkGatewaySet map[NetworkGateway]struct{} |
| |
| func (s NetworkGatewaySet) Equals(other NetworkGatewaySet) bool { |
| if len(s) != len(other) { |
| return false |
| } |
| // deepequal won't catch nil-map == empty map |
| if len(s) == 0 && len(other) == 0 { |
| return true |
| } |
| return reflect.DeepEqual(s, other) |
| } |
| |
| func (s NetworkGatewaySet) Add(gw NetworkGateway) { |
| s[gw] = struct{}{} |
| } |
| |
| func (s NetworkGatewaySet) AddAll(other NetworkGatewaySet) { |
| for gw := range other { |
| s.Add(gw) |
| } |
| } |
| |
| func (s NetworkGatewaySet) ToArray() []NetworkGateway { |
| gws := make([]NetworkGateway, 0, len(s)) |
| for gw := range s { |
| gws = append(gws, gw) |
| } |
| |
| // Sort the array so that it's stable. |
| gws = SortGateways(gws) |
| return gws |
| } |
| |
| // MinGatewayTTL is exported for testing |
| var MinGatewayTTL = 30 * time.Second |
| |
| type networkGatewayNameCache struct { |
| NetworkGatewaysHandler |
| client *dnsClient |
| |
| sync.Mutex |
| cache map[string]nameCacheEntry |
| } |
| |
| type nameCacheEntry struct { |
| value []string |
| expiry time.Time |
| timer *time.Timer |
| } |
| |
| func newNetworkGatewayNameCache() (*networkGatewayNameCache, error) { |
| c, err := newClient() |
| if err != nil { |
| return nil, err |
| } |
| return newNetworkGatewayNameCacheWithClient(c), nil |
| } |
| |
| // newNetworkGatewayNameCacheWithClient exported for test |
| func newNetworkGatewayNameCacheWithClient(c *dnsClient) *networkGatewayNameCache { |
| return &networkGatewayNameCache{client: c, cache: map[string]nameCacheEntry{}} |
| } |
| |
| // Resolve takes a list of hostnames and returns a map of names to addresses |
| func (n *networkGatewayNameCache) Resolve(names sets.Set) map[string][]string { |
| n.Lock() |
| defer n.Unlock() |
| |
| n.cleanupWatches(names) |
| |
| out := make(map[string][]string, len(names)) |
| for name := range names { |
| out[name] = n.resolveFromCache(name) |
| } |
| |
| return out |
| } |
| |
| // cleanupWatches cancels any scheduled re-resolve for names we no longer care about |
| func (n *networkGatewayNameCache) cleanupWatches(names sets.Set) { |
| for name, entry := range n.cache { |
| if names.Contains(name) { |
| continue |
| } |
| entry.timer.Stop() |
| delete(n.cache, name) |
| } |
| } |
| |
| func (n *networkGatewayNameCache) resolveFromCache(name string) []string { |
| if entry, ok := n.cache[name]; ok && entry.expiry.After(time.Now()) { |
| return entry.value |
| } |
| // ideally this will not happen more than once for each name and the cache auto-updates in the background |
| // even if it does, this happens on the SotW ingestion path (kube or meshnetworks changes) and not xds push path. |
| return n.resolveAndCache(name) |
| } |
| |
| func (n *networkGatewayNameCache) resolveAndCache(name string) []string { |
| if entry, ok := n.cache[name]; ok { |
| entry.timer.Stop() |
| } |
| delete(n.cache, name) |
| addrs, ttl := n.resolve(name) |
| // avoid excessive pushes due to small TTL |
| if ttl < MinGatewayTTL { |
| ttl = MinGatewayTTL |
| } |
| expiry := time.Now().Add(ttl) |
| n.cache[name] = nameCacheEntry{ |
| value: addrs, |
| expiry: expiry, |
| // TTL expires, try to refresh TODO should this be < ttl? |
| timer: time.AfterFunc(ttl, n.refreshAndNotify(name)), |
| } |
| |
| return addrs |
| } |
| |
| // refreshAndNotify is triggered via time.AfterFunc and will recursively schedule itself that way until timer is cleaned |
| // up via cleanupWatches. |
| func (n *networkGatewayNameCache) refreshAndNotify(name string) func() { |
| return func() { |
| log.Debugf("network gateways: refreshing DNS for %s", name) |
| n.Lock() |
| old := n.cache[name] |
| addrs := n.resolveAndCache(name) |
| n.Unlock() |
| |
| if !stringSliceEqual(old.value, addrs) { |
| log.Debugf("network gateways: DNS for %s changed: %v -> %v", name, old.value, addrs) |
| n.NotifyGatewayHandlers() |
| } |
| } |
| } |
| |
| // avoid import cycle |
| func stringSliceEqual(a, b []string) bool { |
| if len(a) != len(b) { |
| return false |
| } |
| |
| for i := range a { |
| if a[i] != b[i] { |
| return false |
| } |
| } |
| |
| return true |
| } |
| |
| // resolve gets all the A and AAAA records for the given name |
| func (n *networkGatewayNameCache) resolve(name string) ([]string, time.Duration) { |
| // TODO figure out how to query only A + AAAA |
| res := n.client.Query(new(dns.Msg).SetQuestion(dns.Fqdn(name), dns.TypeANY)) |
| if res == nil || len(res.Answer) == 0 { |
| return nil, 0 |
| } |
| ttl := uint32(math.MaxUint32) |
| var out []string |
| for _, rr := range res.Answer { |
| switch v := rr.(type) { |
| case *dns.A: |
| out = append(out, v.A.String()) |
| case *dns.AAAA: |
| // TODO may not always want ipv6t? |
| out = append(out, v.AAAA.String()) |
| default: |
| // not a valid record, don't inspect TTL |
| continue |
| } |
| if nextTTL := rr.Header().Ttl; nextTTL < ttl { |
| ttl = nextTTL |
| } |
| } |
| sort.Strings(out) |
| return out, time.Duration(ttl) |
| } |
| |
| // TODO share code with pkg/dns |
| type dnsClient struct { |
| *dns.Client |
| resolvConfServers []string |
| } |
| |
| // NetworkGatewayTestDNSServers if set will ignore resolv.conf and use the given DNS servers for tests. |
| var NetworkGatewayTestDNSServers []string |
| |
| func newClient() (*dnsClient, error) { |
| servers := NetworkGatewayTestDNSServers |
| if len(servers) == 0 { |
| dnsConfig, err := dns.ClientConfigFromFile("/etc/resolv.conf") |
| if err != nil { |
| return nil, err |
| } |
| if dnsConfig != nil { |
| for _, s := range dnsConfig.Servers { |
| servers = append(servers, net.JoinHostPort(s, dnsConfig.Port)) |
| } |
| } |
| // TODO take search namespaces into account |
| // TODO what about /etc/hosts? |
| } |
| |
| c := &dnsClient{ |
| Client: &dns.Client{ |
| DialTimeout: 5 * time.Second, |
| ReadTimeout: 5 * time.Second, |
| WriteTimeout: 5 * time.Second, |
| }, |
| } |
| c.resolvConfServers = append(c.resolvConfServers, servers...) |
| return c, nil |
| } |
| |
| func (c *dnsClient) Query(req *dns.Msg) *dns.Msg { |
| var response *dns.Msg |
| for _, upstream := range c.resolvConfServers { |
| cResponse, _, err := c.Exchange(req, upstream) |
| if err == nil { |
| response = cResponse |
| break |
| } |
| log.Infof("upstream dns failure: %v", err) |
| } |
| if response == nil { |
| response = new(dns.Msg) |
| response.SetReply(req) |
| response.Rcode = dns.RcodeServerFailure |
| } |
| return response |
| } |