feat: subset translation (#497)
diff --git a/pkg/kube/apisix/apis/config/v2alpha1/types.go b/pkg/kube/apisix/apis/config/v2alpha1/types.go
index 1fdf482..c978e92 100644
--- a/pkg/kube/apisix/apis/config/v2alpha1/types.go
+++ b/pkg/kube/apisix/apis/config/v2alpha1/types.go
@@ -174,6 +174,9 @@
ResolveGranularity string `json:"resolveGranularity" yaml:"resolveGranularity"`
// Weight of this backend.
Weight *int `json:"weight" yaml:"weight"`
+ // Subset specifies a subset for the target Service. The subset should be pre-defined
+ // in ApisixUpstream about this service.
+ Subset string `json:"subset" yaml:"subset"`
}
// ApisixRouteHTTPPlugin represents an APISIX plugin.
@@ -232,6 +235,9 @@
// wise, the service ClusterIP or ExternalIP will be used,
// default is endpoints.
ResolveGranularity string `json:"resolveGranularity" yaml:"resolveGranularity"`
+ // Subset specifies a subset for the target Service. The subset should be pre-defined
+ // in ApisixUpstream about this service.
+ Subset string `json:"subset" yaml:"subset"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
diff --git a/pkg/kube/translation/apisix_route.go b/pkg/kube/translation/apisix_route.go
index dceda4f..2360d72 100644
--- a/pkg/kube/translation/apisix_route.go
+++ b/pkg/kube/translation/apisix_route.go
@@ -64,7 +64,7 @@
route.UpstreamId = id.GenID(upstreamName)
if !ctx.checkUpstreamExist(upstreamName) {
- ups, err := t.TranslateUpstream(ar.Namespace, p.Backend.ServiceName, int32(p.Backend.ServicePort))
+ ups, err := t.TranslateUpstream(ar.Namespace, p.Backend.ServiceName, "", int32(p.Backend.ServicePort))
if err != nil {
return nil, err
}
@@ -181,7 +181,7 @@
}
ctx.addRoute(route)
if !ctx.checkUpstreamExist(upstreamName) {
- ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort)
+ ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
if err != nil {
return err
}
@@ -312,10 +312,7 @@
name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name)
sr.ID = id.GenID(name)
sr.ServerPort = part.Match.IngressPort
- // TODO use upstream id to refer the upstream object.
- // Currently, APISIX doesn't use upstream_id field in
- // APISIX, so we have to embed the entire upstream.
- ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort)
+ ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
if err != nil {
return err
}
diff --git a/pkg/kube/translation/ingress.go b/pkg/kube/translation/ingress.go
index ab0ea6f..5169b68 100644
--- a/pkg/kube/translation/ingress.go
+++ b/pkg/kube/translation/ingress.go
@@ -169,7 +169,7 @@
} else {
svcPort = backend.Port.Number
}
- ups, err := t.TranslateUpstream(namespace, backend.Name, svcPort)
+ ups, err := t.TranslateUpstream(namespace, backend.Name, "", svcPort)
if err != nil {
return nil, err
}
@@ -260,7 +260,7 @@
} else {
portNumber = svcPort.IntVal
}
- ups, err := t.TranslateUpstream(namespace, svcName, portNumber)
+ ups, err := t.TranslateUpstream(namespace, svcName, "", portNumber)
if err != nil {
return nil, err
}
diff --git a/pkg/kube/translation/plugin.go b/pkg/kube/translation/plugin.go
index d58379d..3cc6490 100644
--- a/pkg/kube/translation/plugin.go
+++ b/pkg/kube/translation/plugin.go
@@ -38,7 +38,7 @@
if err != nil {
return nil, err
}
- ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort)
+ ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
if err != nil {
return nil, err
}
diff --git a/pkg/kube/translation/translator.go b/pkg/kube/translation/translator.go
index e15a6f7..aed3251 100644
--- a/pkg/kube/translation/translator.go
+++ b/pkg/kube/translation/translator.go
@@ -25,6 +25,7 @@
configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
listersv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
+ "github.com/apache/apisix-ingress-controller/pkg/types"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
@@ -54,7 +55,11 @@
// The returned Upstream doesn't have metadata info.
// It doesn't assign any metadata fields, so it's caller's responsibility to decide
// the metadata.
- TranslateUpstream(string, string, int32) (*apisixv1.Upstream, error)
+ // Note the subset is used to filter the ultimate node list, only pods whose labels
+ // matching the subset labels (defined in ApisixUpstream) will be selected.
+ // When the subset is not found, the node list will be empty. When the subset is empty,
+ // all pods IP will be filled.
+ TranslateUpstream(string, string, string, int32) (*apisixv1.Upstream, error)
// TranslateIngress composes a couple of APISIX Routes and upstreams according
// to the given Ingress resource.
TranslateIngress(kube.Ingress) (*TranslateContext, error)
@@ -77,6 +82,7 @@
// TranslatorOptions contains options to help Translator
// work well.
type TranslatorOptions struct {
+ PodCache types.PodCache
PodLister listerscorev1.PodLister
EndpointsLister listerscorev1.EndpointsLister
ServiceLister listerscorev1.ServiceLister
@@ -112,7 +118,7 @@
return ups, nil
}
-func (t *translator) TranslateUpstream(namespace, name string, port int32) (*apisixv1.Upstream, error) {
+func (t *translator) TranslateUpstream(namespace, name, subset string, port int32) (*apisixv1.Upstream, error) {
endpoints, err := t.EndpointsLister.Endpoints(namespace).Get(name)
if err != nil {
return nil, &translateError{
@@ -128,7 +134,13 @@
au, err := t.ApisixUpstreamLister.ApisixUpstreams(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
- ups.Nodes = nodes
+ // If subset in ApisixRoute is not empty but the ApisixUpstream resouce not found,
+ // just set an empty node list.
+ if subset != "" {
+ ups.Nodes = apisixv1.UpstreamNodes{}
+ } else {
+ ups.Nodes = nodes
+ }
return ups, nil
}
return nil, &translateError{
@@ -136,6 +148,19 @@
reason: err.Error(),
}
}
+
+ // Filter nodes by subset.
+ if subset != "" {
+ var labels types.Labels
+ for _, ss := range au.Spec.Subsets {
+ if ss.Name == subset {
+ labels = ss.Labels
+ break
+ }
+ }
+ nodes = t.filterNodesByLabels(nodes, labels, au.Namespace)
+ }
+
upsCfg := &au.Spec.ApisixUpstreamConfig
for _, pls := range au.Spec.PortLevelSettings {
if pls.Port == port {
diff --git a/pkg/kube/translation/util.go b/pkg/kube/translation/util.go
index 16ced5a..3584205 100644
--- a/pkg/kube/translation/util.go
+++ b/pkg/kube/translation/util.go
@@ -24,6 +24,7 @@
"github.com/apache/apisix-ingress-controller/pkg/id"
configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
"github.com/apache/apisix-ingress-controller/pkg/log"
+ "github.com/apache/apisix-ingress-controller/pkg/types"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
@@ -109,8 +110,8 @@
return svc.Spec.ClusterIP, svcPort, nil
}
-func (t *translator) translateUpstream(namespace, svcName, svcResolveGranularity, svcClusterIP string, svcPort int32) (*apisixv1.Upstream, error) {
- ups, err := t.TranslateUpstream(namespace, svcName, svcPort)
+func (t *translator) translateUpstream(namespace, svcName, subset, svcResolveGranularity, svcClusterIP string, svcPort int32) (*apisixv1.Upstream, error) {
+ ups, err := t.TranslateUpstream(namespace, svcName, subset, svcPort)
if err != nil {
return nil, err
}
@@ -128,6 +129,36 @@
return ups, nil
}
+func (t *translator) filterNodesByLabels(nodes apisixv1.UpstreamNodes, labels types.Labels, namespace string) apisixv1.UpstreamNodes {
+ if labels == nil {
+ return nodes
+ }
+
+ var filteredNodes apisixv1.UpstreamNodes
+ for _, node := range nodes {
+ podName, err := t.PodCache.GetNameByIP(node.Host)
+ if err != nil {
+ log.Errorw("failed to find pod name by ip, ignore it",
+ zap.Error(err),
+ zap.String("pod_ip", node.Host),
+ )
+ continue
+ }
+ pod, err := t.PodLister.Pods(namespace).Get(podName)
+ if err != nil {
+ log.Errorw("failed to find pod, ignore it",
+ zap.Error(err),
+ zap.String("pod_name", podName),
+ )
+ continue
+ }
+ if labels.IsSubsetOf(pod.Labels) {
+ filteredNodes = append(filteredNodes, node)
+ }
+ }
+ return filteredNodes
+}
+
func validateRemoteAddrs(remoteAddrs []string) error {
for _, addr := range remoteAddrs {
if ip := net.ParseIP(addr); ip == nil {
diff --git a/samples/deploy/crd/v1beta1/ApisixRoute.yaml b/samples/deploy/crd/v1beta1/ApisixRoute.yaml
index 89f4e1a..974795e 100644
--- a/samples/deploy/crd/v1beta1/ApisixRoute.yaml
+++ b/samples/deploy/crd/v1beta1/ApisixRoute.yaml
@@ -166,6 +166,8 @@
weight:
type: integer
minimum: 0
+ subset:
+ type: string
required:
- serviceName
- servicePort
@@ -188,6 +190,8 @@
weight:
type: integer
minimum: 0
+ subset:
+ type: string
required:
- serviceName
- servicePort
@@ -238,6 +242,8 @@
resolveGranualrity:
type: string
enum: ["endpoint", "service"]
+ subset:
+ type: string
required:
- serviceName
- servicePort