blob: db1a3a40180ae345de2fd59dda0cbd8e9ddc7f62 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ingress
import (
"context"
"net"
"sort"
"strings"
"time"
)
import (
"istio.io/pkg/log"
coreV1 "k8s.io/api/core/v1"
"k8s.io/api/networking/v1beta1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
listerv1 "k8s.io/client-go/listers/core/v1"
listerv1beta1 "k8s.io/client-go/listers/networking/v1beta1"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/config/mesh"
kubelib "github.com/apache/dubbo-go-pixiu/pkg/kube"
"github.com/apache/dubbo-go-pixiu/pkg/queue"
)
const (
updateInterval = 60 * time.Second
)
// StatusSyncer keeps the status IP in each Ingress resource updated
type StatusSyncer struct {
meshHolder mesh.Holder
client kubernetes.Interface
queue queue.Instance
ingressLister listerv1beta1.IngressLister
podLister listerv1.PodLister
serviceLister listerv1.ServiceLister
nodeLister listerv1.NodeLister
ingressClassLister listerv1beta1.IngressClassLister
}
// Run the syncer until stopCh is closed
func (s *StatusSyncer) Run(stopCh <-chan struct{}) {
go s.queue.Run(stopCh)
go s.runUpdateStatus(stopCh)
}
// NewStatusSyncer creates a new instance
func NewStatusSyncer(meshHolder mesh.Holder, client kubelib.Client) *StatusSyncer {
// as in controller, ingressClassListener can be nil since not supported in k8s version <1.18
var ingressClassLister listerv1beta1.IngressClassLister
if NetworkingIngressAvailable(client) {
ingressClassLister = client.KubeInformer().Networking().V1beta1().IngressClasses().Lister()
}
// queue requires a time duration for a retry delay after a handler error
q := queue.NewQueue(5 * time.Second)
return &StatusSyncer{
meshHolder: meshHolder,
client: client,
ingressLister: client.KubeInformer().Networking().V1beta1().Ingresses().Lister(),
podLister: client.KubeInformer().Core().V1().Pods().Lister(),
serviceLister: client.KubeInformer().Core().V1().Services().Lister(),
nodeLister: client.KubeInformer().Core().V1().Nodes().Lister(),
ingressClassLister: ingressClassLister,
queue: q,
}
}
func (s *StatusSyncer) onEvent() error {
addrs, err := s.runningAddresses(ingressNamespace)
if err != nil {
if kerrors.IsNotFound(err) {
return nil
}
return err
}
return s.updateStatus(sliceToStatus(addrs))
}
func (s *StatusSyncer) runUpdateStatus(stop <-chan struct{}) {
if _, err := s.runningAddresses(ingressNamespace); err != nil {
log.Warn("Missing ingress, skip status updates")
err = wait.PollUntil(10*time.Second, func() (bool, error) {
if sa, err := s.runningAddresses(ingressNamespace); err != nil || len(sa) == 0 {
return false, nil
}
return true, nil
}, stop)
if err != nil {
log.Warn("Error waiting for ingress")
return
}
}
err := wait.PollUntil(updateInterval, func() (bool, error) {
s.queue.Push(s.onEvent)
return false, nil
}, stop)
if err != nil {
log.Errorf("Stop requested")
}
}
// updateStatus updates ingress status with the list of IP
func (s *StatusSyncer) updateStatus(status []coreV1.LoadBalancerIngress) error {
l, err := s.ingressLister.List(labels.Everything())
if err != nil {
return err
}
if len(l) == 0 {
return nil
}
sort.SliceStable(status, lessLoadBalancerIngress(status))
for _, currIng := range l {
shouldTarget, err := s.shouldTargetIngress(currIng)
if err != nil {
log.Warnf("error determining whether should target ingress for status update: %v", err)
return err
}
if !shouldTarget {
continue
}
curIPs := currIng.Status.LoadBalancer.Ingress
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
if ingressSliceEqual(status, curIPs) {
log.Debugf("skipping update of Ingress %v/%v (no change)", currIng.Namespace, currIng.Name)
continue
}
currIng.Status.LoadBalancer.Ingress = status
_, err = s.client.NetworkingV1beta1().Ingresses(currIng.Namespace).UpdateStatus(context.TODO(), currIng, metaV1.UpdateOptions{})
if err != nil {
log.Warnf("error updating ingress status: %v", err)
}
}
return nil
}
// runningAddresses returns a list of IP addresses and/or FQDN in the namespace
// where the ingress controller is currently running
func (s *StatusSyncer) runningAddresses(ingressNs string) ([]string, error) {
addrs := make([]string, 0)
ingressService := s.meshHolder.Mesh().IngressService
ingressSelector := s.meshHolder.Mesh().IngressSelector
if ingressService != "" {
svc, err := s.serviceLister.Services(ingressNs).Get(ingressService)
if err != nil {
return nil, err
}
if svc.Spec.Type == coreV1.ServiceTypeExternalName {
addrs = append(addrs, svc.Spec.ExternalName)
return addrs, nil
}
for _, ip := range svc.Status.LoadBalancer.Ingress {
if ip.IP == "" {
addrs = append(addrs, ip.Hostname)
} else {
addrs = append(addrs, ip.IP)
}
}
addrs = append(addrs, svc.Spec.ExternalIPs...)
return addrs, nil
}
// get all pods acting as ingress gateways
igSelector := getIngressGatewaySelector(ingressSelector, ingressService)
igPods, err := s.podLister.Pods(ingressNamespace).List(labels.SelectorFromSet(igSelector))
if err != nil {
return nil, err
}
for _, pod := range igPods {
// only Running pods are valid
if pod.Status.Phase != coreV1.PodRunning {
continue
}
// Find node external IP
node, err := s.nodeLister.Get(pod.Spec.NodeName)
if err != nil {
continue
}
for _, address := range node.Status.Addresses {
if address.Type == coreV1.NodeExternalIP {
if address.Address != "" && !addressInSlice(address.Address, addrs) {
addrs = append(addrs, address.Address)
}
}
}
}
return addrs, nil
}
func addressInSlice(addr string, list []string) bool {
for _, v := range list {
if v == addr {
return true
}
}
return false
}
// sliceToStatus converts a slice of IP and/or hostnames to LoadBalancerIngress
func sliceToStatus(endpoints []string) []coreV1.LoadBalancerIngress {
lbi := make([]coreV1.LoadBalancerIngress, 0, len(endpoints))
for _, ep := range endpoints {
if net.ParseIP(ep) == nil {
lbi = append(lbi, coreV1.LoadBalancerIngress{Hostname: ep})
} else {
lbi = append(lbi, coreV1.LoadBalancerIngress{IP: ep})
}
}
return lbi
}
func lessLoadBalancerIngress(addrs []coreV1.LoadBalancerIngress) func(int, int) bool {
return func(a, b int) bool {
switch strings.Compare(addrs[a].Hostname, addrs[b].Hostname) {
case -1:
return true
case 1:
return false
}
return addrs[a].IP < addrs[b].IP
}
}
func ingressSliceEqual(lhs, rhs []coreV1.LoadBalancerIngress) bool {
if len(lhs) != len(rhs) {
return false
}
for i := range lhs {
if lhs[i].IP != rhs[i].IP {
return false
}
if lhs[i].Hostname != rhs[i].Hostname {
return false
}
}
return true
}
// shouldTargetIngress determines whether the status watcher should target a given ingress resource
func (s *StatusSyncer) shouldTargetIngress(ingress *v1beta1.Ingress) (bool, error) {
var ingressClass *v1beta1.IngressClass
if s.ingressClassLister != nil && ingress.Spec.IngressClassName != nil {
c, err := s.ingressClassLister.Get(*ingress.Spec.IngressClassName)
if err != nil && !kerrors.IsNotFound(err) {
return false, err
}
ingressClass = c
}
return shouldProcessIngressWithClass(s.meshHolder.Mesh(), ingress, ingressClass), nil
}