blob: 5a49c304c30a7f52bb3bb7cc403c3e315ee922a0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kube
import (
"errors"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
listerscorev1 "k8s.io/client-go/listers/core/v1"
listersdiscoveryv1 "k8s.io/client-go/listers/discovery/v1"
"k8s.io/client-go/tools/cache"
"github.com/apache/apisix-ingress-controller/pkg/log"
)
type HostPort struct {
Host string
Port int
}
// EndpointLister is an encapsulation for the lister of Kubernetes
// Endpoint and EndpointSlice.
type EndpointLister interface {
// GetEndpoint fetches an Endpoint which entity is the Kubernetes
// Endpoint according to the namespace and name.
// Automatically choose from endpoint/endpointslice
GetEndpoint(string, string) (Endpoint, error)
}
type endpointLister struct {
useEndpointSlice bool
epLister listerscorev1.EndpointsLister
epsLister listersdiscoveryv1.EndpointSliceLister
}
func (lister *endpointLister) GetEndpoint(namespace, name string) (Endpoint, error) {
if lister.useEndpointSlice {
return lister.getEndpointSlices(namespace, name)
}
return lister.getEndpoint(namespace, name)
}
func (lister *endpointLister) getEndpoint(namespace, name string) (Endpoint, error) {
if lister.epLister == nil {
panic("not a endpoint lister")
}
ep, err := lister.epLister.Endpoints(namespace).Get(name)
if err != nil {
return nil, err
}
if ep == nil {
log.Warnw("get endpoints but found nil",
zap.String("namespace", namespace),
zap.String("name", name),
)
}
return &endpoint{
endpointType: endpointTypeEndpoints,
endpoint: ep,
}, nil
}
func (lister *endpointLister) getEndpointSlices(namespace, svcName string) (Endpoint, error) {
if lister.epsLister == nil {
panic("not a endpointSlice lister")
}
selector := labels.SelectorFromSet(labels.Set{
discoveryv1.LabelServiceName: svcName,
})
eps, err := lister.epsLister.EndpointSlices(namespace).List(selector)
if err != nil {
return nil, err
}
if len(eps) == 0 {
log.Warnw("get endpoint slices but found empty slice",
zap.String("namespace", namespace),
zap.String("service", svcName),
zap.Any("selector", selector),
)
}
return &endpoint{
endpointType: endpointTypeEndpointSlices,
endpointSlices: eps,
}, nil
}
// Endpoint is an encapsulation for the Kubernetes Endpoint and EndpointSlice objects.
type Endpoint interface {
// ServiceName returns the corresponding service owner of this endpoint.
ServiceName() string
// Namespace returns the residing namespace.
Namespace() (string, error)
// Endpoints returns the corresponding endpoints which matches the ServicePort.
Endpoints(port *corev1.ServicePort) []HostPort
}
type endpointType string
const (
endpointTypeEndpoints endpointType = "Endpoint"
endpointTypeEndpointSlices endpointType = "EndpointSlices"
)
type endpoint struct {
endpointType endpointType
endpoint *corev1.Endpoints
endpointSlices []*discoveryv1.EndpointSlice
}
func (e *endpoint) ServiceName() string {
if e.endpoint != nil {
return e.endpoint.Name
}
return e.endpointSlices[0].Labels[discoveryv1.LabelServiceName]
}
func (e *endpoint) Namespace() (string, error) {
switch e.endpointType {
case endpointTypeEndpointSlices:
if len(e.endpointSlices) > 0 {
return e.endpointSlices[0].Namespace, nil
} else {
return "", errors.New("endpoint slice is empty")
}
case endpointTypeEndpoints:
if e.endpoint != nil {
return e.endpoint.Namespace, nil
} else {
return "", errors.New("endpoint is nil")
}
}
return "", errors.New("unknown endpoint type " + string(e.endpointType))
}
func (e *endpoint) Endpoints(svcPort *corev1.ServicePort) []HostPort {
var addrs []HostPort
if e.endpoint != nil {
for _, subset := range e.endpoint.Subsets {
epPort := -1
for _, subsetPort := range subset.Ports {
if subsetPort.Name == svcPort.Name {
epPort = int(subsetPort.Port)
break
}
}
if epPort != -1 {
for _, addr := range subset.Addresses {
addrs = append(addrs, HostPort{
Host: addr.IP,
Port: epPort,
})
}
}
}
} else {
for _, slice := range e.endpointSlices {
epPort := -1
for _, slicePort := range slice.Ports {
// TODO Consider the case that port not restricted.
if slicePort.Name != nil && *slicePort.Name == svcPort.Name && slicePort.Port != nil {
epPort = int(*slicePort.Port)
break
}
}
if epPort != -1 {
for _, ep := range slice.Endpoints {
if ep.Conditions.Ready != nil && !*ep.Conditions.Ready {
// Ignore not ready endpoints.
continue
}
for _, addr := range ep.Addresses {
addrs = append(addrs, HostPort{
Host: addr,
Port: epPort,
})
}
}
}
}
}
return addrs
}
// NewEndpointListerAndInformer creates an EndpointLister and the sharedIndexInformer.
func NewEndpointListerAndInformer(factory informers.SharedInformerFactory, useEndpointSlice bool) (EndpointLister, cache.SharedIndexInformer) {
var informer cache.SharedIndexInformer
epLister := endpointLister{
useEndpointSlice: useEndpointSlice,
}
if !useEndpointSlice {
epLister.epLister = factory.Core().V1().Endpoints().Lister()
informer = factory.Core().V1().Endpoints().Informer()
} else {
epLister.epsLister = factory.Discovery().V1().EndpointSlices().Lister()
informer = factory.Discovery().V1().EndpointSlices().Informer()
}
return &epLister, informer
}
// NewEndpoint creates an Endpoint which entity is Kubernetes Endpoints.
func NewEndpoint(ep *corev1.Endpoints) Endpoint {
return &endpoint{
endpointType: endpointTypeEndpoints,
endpoint: ep,
}
}
// NewEndpointWithSlice creates an Endpoint which entity is Kubernetes EndpointSlices.
func NewEndpointWithSlice(ep *discoveryv1.EndpointSlice) Endpoint {
return &endpoint{
endpointType: endpointTypeEndpointSlices,
endpointSlices: []*discoveryv1.EndpointSlice{ep},
}
}