blob: f3d9e3159a0682cd0cceee5a2e986475639211b9 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package serviceregistry_test
import (
"context"
"encoding/json"
"fmt"
"reflect"
"sort"
"testing"
"time"
)
import (
meshconfig "istio.io/api/mesh/v1alpha1"
"istio.io/api/meta/v1alpha1"
networking "istio.io/api/networking/v1alpha3"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/config/memory"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model/status"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/aggregate"
kubecontroller "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube/controller"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/serviceentry"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/xds"
"github.com/apache/dubbo-go-pixiu/pilot/test/util"
"github.com/apache/dubbo-go-pixiu/pilot/test/xdstest"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/config/host"
"github.com/apache/dubbo-go-pixiu/pkg/config/mesh"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/collections"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk"
kubeclient "github.com/apache/dubbo-go-pixiu/pkg/kube"
istiotest "github.com/apache/dubbo-go-pixiu/pkg/test"
"github.com/apache/dubbo-go-pixiu/pkg/test/util/retry"
)
func setupTest(t *testing.T) (
*kubecontroller.Controller,
*serviceentry.Controller,
model.ConfigStoreController,
kubernetes.Interface,
*xds.FakeXdsUpdater) {
t.Helper()
client := kubeclient.NewFakeClient()
eventch := make(chan xds.FakeXdsEvent, 100)
xdsUpdater := &xds.FakeXdsUpdater{
Events: eventch,
}
meshWatcher := mesh.NewFixedWatcher(&meshconfig.MeshConfig{})
kc := kubecontroller.NewController(
client,
kubecontroller.Options{
XDSUpdater: xdsUpdater,
DomainSuffix: "cluster.local",
MeshWatcher: meshWatcher,
MeshServiceController: aggregate.NewController(aggregate.Options{MeshHolder: meshWatcher}),
},
)
configController := memory.NewController(memory.Make(collections.Pilot))
stop := make(chan struct{})
t.Cleanup(func() {
close(stop)
})
go configController.Run(stop)
istioStore := model.MakeIstioStore(configController)
se := serviceentry.NewController(configController, istioStore, xdsUpdater)
client.RunAndWait(stop)
kc.AppendWorkloadHandler(se.WorkloadInstanceHandler)
se.AppendWorkloadHandler(kc.WorkloadInstanceHandler)
go kc.Run(stop)
go se.Run(stop)
return kc, se, configController, client.Kube(), xdsUpdater
}
// TestWorkloadInstances is effectively an integration test of composing the Kubernetes service registry with the
// external service registry, which have cross-references by workload instances.
func TestWorkloadInstances(t *testing.T) {
istiotest.SetBoolForTest(t, &features.WorkloadEntryHealthChecks, true)
port := &networking.Port{
Name: "http",
Number: 80,
Protocol: "http",
}
labels := map[string]string{
"app": "foo",
}
namespace := "namespace"
serviceEntry := config.Config{
Meta: config.Meta{
Name: "service-entry",
Namespace: namespace,
GroupVersionKind: gvk.ServiceEntry,
Domain: "cluster.local",
},
Spec: &networking.ServiceEntry{
Hosts: []string{"service.namespace.svc.cluster.local"},
Ports: []*networking.Port{port},
WorkloadSelector: &networking.WorkloadSelector{
Labels: labels,
},
Resolution: networking.ServiceEntry_STATIC,
},
}
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "service",
Namespace: namespace,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{
Name: "http",
Port: 80,
}},
Selector: labels,
ClusterIP: "9.9.9.9",
},
}
headlessService := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "service",
Namespace: namespace,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{
Name: "http",
Port: 80,
}},
Selector: labels,
ClusterIP: v1.ClusterIPNone,
},
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod",
Namespace: namespace,
Labels: labels,
},
Status: v1.PodStatus{
PodIP: "1.2.3.4",
Phase: v1.PodPending,
},
}
workloadEntry := config.Config{
Meta: config.Meta{
Name: "workload",
Namespace: namespace,
GroupVersionKind: gvk.WorkloadEntry,
Domain: "cluster.local",
},
Spec: &networking.WorkloadEntry{
Address: "2.3.4.5",
Labels: labels,
},
}
expectedSvc := &model.Service{
Hostname: "service.namespace.svc.cluster.local",
Ports: []*model.Port{{
Name: "http",
Port: 80,
Protocol: "http",
}, {
Name: "http2",
Port: 90,
Protocol: "http",
}},
Attributes: model.ServiceAttributes{
Namespace: namespace,
Name: "service",
LabelSelectors: labels,
},
}
t.Run("Kubernetes only", func(t *testing.T) {
kc, _, _, kube, _ := setupTest(t)
makeService(t, kube, service)
makePod(t, kube, pod)
createEndpoints(t, kube, service.Name, namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{pod.Status.PodIP})
instances := []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: pod.Status.PodIP,
Port: 80,
}}
expectServiceInstances(t, kc, expectedSvc, 80, instances)
})
t.Run("Kubernetes only: headless service", func(t *testing.T) {
kc, _, _, kube, xdsUpdater := setupTest(t)
makeService(t, kube, headlessService)
xdsUpdater.WaitOrFail(t, "svcupdate")
makePod(t, kube, pod)
createEndpoints(t, kube, service.Name, namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{pod.Status.PodIP})
xdsUpdater.WaitOrFail(t, "eds")
xdsUpdater.WaitOrFail(t, "xds")
instances := []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: pod.Status.PodIP,
Port: 80,
}}
expectServiceInstances(t, kc, expectedSvc, 80, instances)
})
t.Run("Kubernetes only: endpoint occur earlier", func(t *testing.T) {
kc, _, _, kube, xdsUpdater := setupTest(t)
makePod(t, kube, pod)
createEndpoints(t, kube, service.Name, namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{pod.Status.PodIP})
waitForEdsUpdate(t, xdsUpdater, 1)
// make service populated later than endpoint
makeService(t, kube, service)
waitForEdsUpdate(t, xdsUpdater, 1)
instances := []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: pod.Status.PodIP,
Port: 80,
}}
expectServiceInstances(t, kc, expectedSvc, 80, instances)
})
t.Run("External only: workLoadEntry port and serviceEntry target port is not set, use serviceEntry port.number", func(t *testing.T) {
_, wc, store, _, _ := setupTest(t)
makeIstioObject(t, store, serviceEntry)
makeIstioObject(t, store, workloadEntry)
instances := []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: workloadEntry.Spec.(*networking.WorkloadEntry).Address,
Port: 80,
}}
expectServiceInstances(t, wc, expectedSvc, 80, instances)
})
t.Run("External only: the port name of the workloadEntry and serviceEntry does match, use workloadEntry port to override", func(t *testing.T) {
_, wc, store, _, _ := setupTest(t)
makeIstioObject(t, store, serviceEntry)
makeIstioObject(t, store, config.Config{
Meta: config.Meta{
Name: "workload",
Namespace: namespace,
GroupVersionKind: gvk.WorkloadEntry,
Domain: "cluster.local",
},
Spec: &networking.WorkloadEntry{
Address: "2.3.4.5",
Labels: labels,
Ports: map[string]uint32{
serviceEntry.Spec.(*networking.ServiceEntry).Ports[0].Name: 8080,
},
},
})
instances := []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: workloadEntry.Spec.(*networking.WorkloadEntry).Address,
Port: 8080,
}}
expectServiceInstances(t, wc, expectedSvc, 80, instances)
})
t.Run("External only: workloadEntry port is not set, use target port", func(t *testing.T) {
_, wc, store, _, _ := setupTest(t)
makeIstioObject(t, store, config.Config{
Meta: config.Meta{
Name: "service-entry",
Namespace: namespace,
GroupVersionKind: gvk.ServiceEntry,
Domain: "cluster.local",
},
Spec: &networking.ServiceEntry{
Hosts: []string{"service.namespace.svc.cluster.local"},
Ports: []*networking.Port{{
Name: "http",
Number: 80,
Protocol: "http",
TargetPort: 8080,
}},
WorkloadSelector: &networking.WorkloadSelector{
Labels: labels,
},
},
})
makeIstioObject(t, store, workloadEntry)
instances := []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: workloadEntry.Spec.(*networking.WorkloadEntry).Address,
Port: 8080,
}}
expectServiceInstances(t, wc, expectedSvc, 80, instances)
})
t.Run("External only: the port name of the workloadEntry and serviceEntry does not match, use target port", func(t *testing.T) {
_, wc, store, _, _ := setupTest(t)
makeIstioObject(t, store, config.Config{
Meta: config.Meta{
Name: "service-entry",
Namespace: namespace,
GroupVersionKind: gvk.ServiceEntry,
Domain: "cluster.local",
},
Spec: &networking.ServiceEntry{
Hosts: []string{"service.namespace.svc.cluster.local"},
Ports: []*networking.Port{{
Name: "http",
Number: 80,
Protocol: "http",
TargetPort: 8080,
}},
WorkloadSelector: &networking.WorkloadSelector{
Labels: labels,
},
},
})
makeIstioObject(t, store, config.Config{
Meta: config.Meta{
Name: "workload",
Namespace: namespace,
GroupVersionKind: gvk.WorkloadEntry,
Domain: "cluster.local",
},
Spec: &networking.WorkloadEntry{
Address: "2.3.4.5",
Labels: labels,
Ports: map[string]uint32{
"different-port-name": 8081,
},
},
})
instances := []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: workloadEntry.Spec.(*networking.WorkloadEntry).Address,
Port: 8080,
}}
expectServiceInstances(t, wc, expectedSvc, 80, instances)
})
t.Run("External only: the port name of the workloadEntry and serviceEntry does not match, "+
"and the serivceEntry target port is not set, use serviceEntry port.number", func(t *testing.T) {
_, wc, store, _, _ := setupTest(t)
makeIstioObject(t, store, serviceEntry)
makeIstioObject(t, store, config.Config{
Meta: config.Meta{
Name: "workload",
Namespace: namespace,
GroupVersionKind: gvk.WorkloadEntry,
Domain: "cluster.local",
},
Spec: &networking.WorkloadEntry{
Address: "2.3.4.5",
Labels: labels,
Ports: map[string]uint32{
"different-port-name": 8081,
},
},
})
instances := []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: workloadEntry.Spec.(*networking.WorkloadEntry).Address,
Port: 80,
}}
expectServiceInstances(t, wc, expectedSvc, 80, instances)
})
t.Run("Service selects WorkloadEntry", func(t *testing.T) {
kc, _, store, kube, _ := setupTest(t)
makeService(t, kube, service)
makeIstioObject(t, store, workloadEntry)
instances := []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: workloadEntry.Spec.(*networking.WorkloadEntry).Address,
Port: 80,
}}
expectServiceInstances(t, kc, expectedSvc, 80, instances)
})
t.Run("Service selects WorkloadEntry: wle occur earlier", func(t *testing.T) {
kc, _, store, kube, xdsUpdater := setupTest(t)
makeIstioObject(t, store, workloadEntry)
// Wait no event pushed when workload entry created as no service entry
select {
case ev := <-xdsUpdater.Events:
t.Fatalf("Got %s event, expect none", ev.Kind)
case <-time.After(40 * time.Millisecond):
}
makeService(t, kube, service)
event := xdsUpdater.WaitOrFail(t, "edscache")
if event.Endpoints != 1 {
t.Errorf("expecting 1 endpoints, but got %d ", event.Endpoints)
}
instances := []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: workloadEntry.Spec.(*networking.WorkloadEntry).Address,
Port: 80,
}}
expectServiceInstances(t, kc, expectedSvc, 80, instances)
})
t.Run("Service selects both pods and WorkloadEntry", func(t *testing.T) {
kc, _, store, kube, xdsUpdater := setupTest(t)
makeService(t, kube, service)
xdsUpdater.WaitOrFail(t, "svcupdate")
makeIstioObject(t, store, workloadEntry)
xdsUpdater.WaitOrFail(t, "eds")
makePod(t, kube, pod)
createEndpoints(t, kube, service.Name, namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{pod.Status.PodIP})
waitForEdsUpdate(t, xdsUpdater, 2)
instances := []ServiceInstanceResponse{
{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: pod.Status.PodIP,
Port: 80,
},
{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: workloadEntry.Spec.(*networking.WorkloadEntry).Address,
Port: 80,
},
}
expectServiceInstances(t, kc, expectedSvc, 80, instances)
})
t.Run("Service selects both pods and WorkloadEntry: wle occur earlier", func(t *testing.T) {
kc, _, store, kube, xdsUpdater := setupTest(t)
makeIstioObject(t, store, workloadEntry)
// Wait no event pushed when workload entry created as no service entry
select {
case ev := <-xdsUpdater.Events:
t.Fatalf("Got %s event, expect none", ev.Kind)
case <-time.After(200 * time.Millisecond):
}
makePod(t, kube, pod)
createEndpoints(t, kube, service.Name, namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{pod.Status.PodIP})
waitForEdsUpdate(t, xdsUpdater, 1)
makeService(t, kube, service)
waitForEdsUpdate(t, xdsUpdater, 2)
instances := []ServiceInstanceResponse{
{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: pod.Status.PodIP,
Port: 80,
},
{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: workloadEntry.Spec.(*networking.WorkloadEntry).Address,
Port: 80,
},
}
expectServiceInstances(t, kc, expectedSvc, 80, instances)
})
t.Run("Service selects WorkloadEntry with port name", func(t *testing.T) {
kc, _, store, kube, _ := setupTest(t)
makeService(t, kube, &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "service",
Namespace: namespace,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{
Name: "my-port",
Port: 80,
}},
Selector: labels,
ClusterIP: "9.9.9.9",
},
})
makeIstioObject(t, store, config.Config{
Meta: config.Meta{
Name: "workload",
Namespace: namespace,
GroupVersionKind: gvk.WorkloadEntry,
Domain: "cluster.local",
},
Spec: &networking.WorkloadEntry{
Address: "2.3.4.5",
Labels: labels,
Ports: map[string]uint32{
"my-port": 8080,
},
},
})
instances := []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: workloadEntry.Spec.(*networking.WorkloadEntry).Address,
Port: 8080,
}}
expectServiceInstances(t, kc, expectedSvc, 80, instances)
})
t.Run("Service selects WorkloadEntry with targetPort name", func(t *testing.T) {
kc, _, store, kube, _ := setupTest(t)
makeService(t, kube, &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "service",
Namespace: namespace,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{
Name: "http",
Port: 80,
TargetPort: intstr.Parse("my-port"),
}},
Selector: labels,
ClusterIP: "9.9.9.9",
},
})
makeIstioObject(t, store, config.Config{
Meta: config.Meta{
Name: "workload",
Namespace: namespace,
GroupVersionKind: gvk.WorkloadEntry,
Domain: "cluster.local",
},
Spec: &networking.WorkloadEntry{
Address: "2.3.4.5",
Labels: labels,
Ports: map[string]uint32{
"my-port": 8080,
},
},
})
instances := []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: workloadEntry.Spec.(*networking.WorkloadEntry).Address,
Port: 8080,
}}
expectServiceInstances(t, kc, expectedSvc, 80, instances)
})
t.Run("Service selects WorkloadEntry with targetPort number", func(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
makeService(t, s.KubeClient(), &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "service",
Namespace: namespace,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Name: "http",
Port: 80,
TargetPort: intstr.FromInt(8080),
},
{
Name: "http2",
Port: 90,
TargetPort: intstr.FromInt(9090),
},
},
Selector: labels,
ClusterIP: "9.9.9.9",
},
})
makeIstioObject(t, s.Store(), config.Config{
Meta: config.Meta{
Name: "workload",
Namespace: namespace,
GroupVersionKind: gvk.WorkloadEntry,
Domain: "cluster.local",
},
Spec: &networking.WorkloadEntry{
Address: "2.3.4.5",
Labels: labels,
},
})
instances := []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: workloadEntry.Spec.(*networking.WorkloadEntry).Address,
Port: 8080,
}}
expectServiceInstances(t, s.KubeRegistry, expectedSvc, 80, instances)
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"2.3.4.5:8080"})
instances = []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: workloadEntry.Spec.(*networking.WorkloadEntry).Address,
Port: 9090,
}}
expectServiceInstances(t, s.KubeRegistry, expectedSvc, 90, instances)
expectEndpoints(t, s, "outbound|90||service.namespace.svc.cluster.local", []string{"2.3.4.5:9090"})
})
t.Run("ServiceEntry selects Pod", func(t *testing.T) {
_, wc, store, kube, _ := setupTest(t)
makeIstioObject(t, store, serviceEntry)
makePod(t, kube, pod)
instances := []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: pod.Status.PodIP,
Port: 80,
}}
expectServiceInstances(t, wc, expectedSvc, 80, instances)
})
t.Run("ServiceEntry selects Pod that is in transit states", func(t *testing.T) {
_, wc, store, kube, _ := setupTest(t)
makeIstioObject(t, store, serviceEntry)
makePod(t, kube, pod)
instances := []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: pod.Status.PodIP,
Port: 80,
}}
expectServiceInstances(t, wc, expectedSvc, 80, instances)
// when pods become unready, we should see the instances being removed from the registry
setPodUnready(pod)
_, err := kube.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
expectServiceInstances(t, wc, expectedSvc, 80, []ServiceInstanceResponse{})
setPodReady(pod)
_, err = kube.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
expectServiceInstances(t, wc, expectedSvc, 80, instances)
})
t.Run("ServiceEntry selects Pod with targetPort number", func(t *testing.T) {
_, wc, store, kube, _ := setupTest(t)
makeIstioObject(t, store, config.Config{
Meta: config.Meta{
Name: "service-entry",
Namespace: namespace,
GroupVersionKind: gvk.ServiceEntry,
Domain: "cluster.local",
},
Spec: &networking.ServiceEntry{
Hosts: []string{"service.namespace.svc.cluster.local"},
Ports: []*networking.Port{{
Name: "http",
Number: 80,
Protocol: "http",
TargetPort: 8080,
}},
WorkloadSelector: &networking.WorkloadSelector{
Labels: labels,
},
},
})
makePod(t, kube, pod)
instances := []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: pod.Status.PodIP,
Port: 8080,
}}
expectServiceInstances(t, wc, expectedSvc, 80, instances)
})
t.Run("All directions", func(t *testing.T) {
kc, wc, store, kube, _ := setupTest(t)
makeService(t, kube, service)
makeIstioObject(t, store, serviceEntry)
makePod(t, kube, pod)
createEndpoints(t, kube, service.Name, namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{pod.Status.PodIP})
makeIstioObject(t, store, workloadEntry)
instances := []ServiceInstanceResponse{
{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: pod.Status.PodIP,
Port: 80,
},
{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: workloadEntry.Spec.(*networking.WorkloadEntry).Address,
Port: 80,
},
}
expectServiceInstances(t, wc, expectedSvc, 80, instances)
expectServiceInstances(t, kc, expectedSvc, 80, instances)
})
t.Run("All directions with deletion", func(t *testing.T) {
kc, wc, store, kube, _ := setupTest(t)
makeService(t, kube, service)
makeIstioObject(t, store, serviceEntry)
makePod(t, kube, pod)
createEndpoints(t, kube, service.Name, namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{pod.Status.PodIP})
makeIstioObject(t, store, workloadEntry)
instances := []ServiceInstanceResponse{
{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: pod.Status.PodIP,
Port: 80,
},
{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: workloadEntry.Spec.(*networking.WorkloadEntry).Address,
Port: 80,
},
}
expectServiceInstances(t, wc, expectedSvc, 80, instances)
expectServiceInstances(t, kc, expectedSvc, 80, instances)
_ = kube.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
_ = kube.CoreV1().Endpoints(pod.Namespace).Delete(context.TODO(), "service", metav1.DeleteOptions{})
_ = store.Delete(gvk.WorkloadEntry, workloadEntry.Name, workloadEntry.Namespace, nil)
expectServiceInstances(t, wc, expectedSvc, 80, []ServiceInstanceResponse{})
expectServiceInstances(t, kc, expectedSvc, 80, []ServiceInstanceResponse{})
})
t.Run("Service selects WorkloadEntry: update service", func(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
makeService(t, s.KubeClient(), service)
makeIstioObject(t, s.Store(), workloadEntry)
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"2.3.4.5:80"})
newSvc := service.DeepCopy()
newSvc.Spec.Ports[0].Port = 8080
makeService(t, s.KubeClient(), newSvc)
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", nil)
expectEndpoints(t, s, "outbound|8080||service.namespace.svc.cluster.local", []string{"2.3.4.5:8080"})
newSvc.Spec.Ports[0].TargetPort = intstr.IntOrString{IntVal: 9090}
makeService(t, s.KubeClient(), newSvc)
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", nil)
expectEndpoints(t, s, "outbound|8080||service.namespace.svc.cluster.local", []string{"2.3.4.5:9090"})
if err := s.KubeClient().CoreV1().Services(newSvc.Namespace).Delete(context.Background(), newSvc.Name, metav1.DeleteOptions{}); err != nil {
t.Fatal(err)
}
expectEndpoints(t, s, "outbound|8080||service.namespace.svc.cluster.local", nil)
})
t.Run("Service selects WorkloadEntry: update workloadEntry", func(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
makeService(t, s.KubeClient(), service)
makeIstioObject(t, s.Store(), workloadEntry)
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"2.3.4.5:80"})
newWE := workloadEntry.DeepCopy()
newWE.Spec.(*networking.WorkloadEntry).Address = "3.4.5.6"
makeIstioObject(t, s.Store(), newWE)
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"3.4.5.6:80"})
if err := s.Store().Delete(gvk.WorkloadEntry, newWE.Name, newWE.Namespace, nil); err != nil {
t.Fatal(err)
}
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", nil)
})
t.Run("ServiceEntry selects Pod: update service entry", func(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
makeIstioObject(t, s.Store(), serviceEntry)
makePod(t, s.KubeClient(), pod)
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"1.2.3.4:80"})
newSE := serviceEntry.DeepCopy()
newSE.Spec.(*networking.ServiceEntry).Ports = []*networking.Port{{
Name: "http",
Number: 80,
Protocol: "http",
TargetPort: 8080,
}}
makeIstioObject(t, s.Store(), newSE)
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"1.2.3.4:8080"})
newSE = newSE.DeepCopy()
newSE.Spec.(*networking.ServiceEntry).Ports = []*networking.Port{{
Name: "http",
Number: 9090,
Protocol: "http",
TargetPort: 9091,
}}
makeIstioObject(t, s.Store(), newSE)
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", nil)
expectEndpoints(t, s, "outbound|9090||service.namespace.svc.cluster.local", []string{"1.2.3.4:9091"})
if err := s.Store().Delete(gvk.ServiceEntry, newSE.Name, newSE.Namespace, nil); err != nil {
t.Fatal(err)
}
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", nil)
expectEndpoints(t, s, "outbound|9090||service.namespace.svc.cluster.local", nil)
})
t.Run("ServiceEntry selects Pod: update pod", func(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
makeIstioObject(t, s.Store(), serviceEntry)
makePod(t, s.KubeClient(), pod)
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"1.2.3.4:80"})
newPod := pod.DeepCopy()
newPod.Status.PodIP = "2.3.4.5"
makePod(t, s.KubeClient(), newPod)
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"2.3.4.5:80"})
if err := s.KubeClient().CoreV1().Pods(newPod.Namespace).Delete(context.Background(), newPod.Name, metav1.DeleteOptions{}); err != nil {
t.Fatal(err)
}
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", nil)
})
t.Run("ServiceEntry selects Pod: deleting pod", func(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
makeIstioObject(t, s.Store(), serviceEntry)
makePod(t, s.KubeClient(), pod)
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"1.2.3.4:80"})
// Simulate pod being deleted by setting deletion timestamp
newPod := pod.DeepCopy()
newPod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
makePod(t, s.KubeClient(), newPod)
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", nil)
if err := s.KubeClient().CoreV1().Pods(newPod.Namespace).Delete(context.Background(), newPod.Name, metav1.DeleteOptions{}); err != nil {
t.Fatal(err)
}
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", nil)
})
t.Run("Service selects WorkloadEntry: health status", func(t *testing.T) {
kc, _, store, kube, _ := setupTest(t)
makeService(t, kube, service)
// Start as unhealthy, should have no instances
makeIstioObject(t, store, setHealth(workloadEntry, false))
instances := []ServiceInstanceResponse{}
expectServiceInstances(t, kc, expectedSvc, 80, instances)
// Mark healthy, get instances
makeIstioObject(t, store, setHealth(workloadEntry, true))
instances = []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: workloadEntry.Spec.(*networking.WorkloadEntry).Address,
Port: 80,
}}
expectServiceInstances(t, kc, expectedSvc, 80, instances)
// Set back to unhealthy
makeIstioObject(t, store, setHealth(workloadEntry, false))
instances = []ServiceInstanceResponse{}
expectServiceInstances(t, kc, expectedSvc, 80, instances)
// Remove health status entirely
makeIstioObject(t, store, workloadEntry)
instances = []ServiceInstanceResponse{{
Hostname: expectedSvc.Hostname,
Namestring: expectedSvc.Attributes.Namespace,
Address: workloadEntry.Spec.(*networking.WorkloadEntry).Address,
Port: 80,
}}
expectServiceInstances(t, kc, expectedSvc, 80, instances)
})
}
func setHealth(cfg config.Config, healthy bool) config.Config {
cfg = cfg.DeepCopy()
if cfg.Annotations == nil {
cfg.Annotations = map[string]string{}
}
cfg.Annotations[status.WorkloadEntryHealthCheckAnnotation] = "true"
if healthy {
return status.UpdateConfigCondition(cfg, &v1alpha1.IstioCondition{
Type: status.ConditionHealthy,
Status: status.StatusTrue,
})
}
return status.UpdateConfigCondition(cfg, &v1alpha1.IstioCondition{
Type: status.ConditionHealthy,
Status: status.StatusFalse,
})
}
func waitForEdsUpdate(t *testing.T, xdsUpdater *xds.FakeXdsUpdater, expected int) {
t.Helper()
retry.UntilSuccessOrFail(t, func() error {
event := xdsUpdater.WaitOrFail(t, "eds", "edscache")
if event.Endpoints != expected {
return fmt.Errorf("expecting %d endpoints, but got %d", expected, event.Endpoints)
}
return nil
}, retry.Delay(time.Millisecond*10), retry.Timeout(time.Second))
}
func TestEndpointsDeduping(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{
KubernetesEndpointMode: kubecontroller.EndpointSliceOnly,
})
namespace := "namespace"
labels := map[string]string{
"app": "bar",
}
makeService(t, s.KubeClient(), &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "service",
Namespace: namespace,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{
Name: "http",
Port: 80,
}, {
Name: "http-other",
Port: 90,
}},
Selector: labels,
ClusterIP: "9.9.9.9",
},
})
// Create an expect endpoint
createEndpointSlice(t, s.KubeClient(), "slice1", "service", namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{"1.2.3.4"})
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"1.2.3.4:80"})
// create an FQDN endpoint that should be ignored
createEndpointSliceWithType(t, s.KubeClient(), "slice1", "service",
namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{"foo.com"}, discovery.AddressTypeFQDN)
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"1.2.3.4:80"})
// Add another port endpoint
createEndpointSlice(t, s.KubeClient(), "slice1", "service", namespace,
[]v1.EndpointPort{{Name: "http-other", Port: 90}, {Name: "http", Port: 80}}, []string{"1.2.3.4", "2.3.4.5"})
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"1.2.3.4:80", "2.3.4.5:80"})
expectEndpoints(t, s, "outbound|90||service.namespace.svc.cluster.local", []string{"1.2.3.4:90", "2.3.4.5:90"})
// Move the endpoint to another slice - transition phase where its duplicated
createEndpointSlice(t, s.KubeClient(), "slice1", "service", namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{"1.2.3.5", "2.3.4.5"})
createEndpointSlice(t, s.KubeClient(), "slice2", "service", namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{"2.3.4.5"})
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"1.2.3.5:80", "2.3.4.5:80"})
// Move the endpoint to another slice - completed
createEndpointSlice(t, s.KubeClient(), "slice1", "service", namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{"1.2.3.4"})
createEndpointSlice(t, s.KubeClient(), "slice2", "service", namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{"2.3.4.5"})
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"1.2.3.4:80", "2.3.4.5:80"})
// Delete endpoint
createEndpointSlice(t, s.KubeClient(), "slice1", "service", namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{"1.2.3.4"})
createEndpointSlice(t, s.KubeClient(), "slice2", "service", namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{})
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"1.2.3.4:80"})
_ = s.KubeClient().DiscoveryV1().EndpointSlices(namespace).Delete(context.TODO(), "slice1", metav1.DeleteOptions{})
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", nil)
// Ensure there is nothing is left over
expectServiceInstances(t, s.KubeRegistry, &model.Service{
Hostname: "service.namespace.svc.cluster.local",
Ports: []*model.Port{{
Name: "http",
Port: 80,
Protocol: "http",
}},
Attributes: model.ServiceAttributes{
Namespace: namespace,
Name: "service",
LabelSelectors: labels,
},
}, 80, []ServiceInstanceResponse{})
}
// TestEndpointSlicingServiceUpdate is a regression test to ensure we do not end up with duplicate endpoints when a service changes.
func TestEndpointSlicingServiceUpdate(t *testing.T) {
for _, version := range []string{"latest", "20"} {
t.Run("kuberentes 1."+version, func(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{
KubernetesEndpointMode: kubecontroller.EndpointSliceOnly,
KubernetesVersion: version,
EnableFakeXDSUpdater: true,
})
namespace := "namespace"
labels := map[string]string{
"app": "bar",
}
makeService(t, s.KubeClient(), &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "service",
Namespace: namespace,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{
Name: "http",
Port: 80,
}, {
Name: "http-other",
Port: 90,
}},
Selector: labels,
ClusterIP: "9.9.9.9",
},
})
xdsUpdater := s.XdsUpdater.(*xds.FakeXdsUpdater)
createEndpointSlice(t, s.KubeClient(), "slice1", "service", namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{"1.2.3.4"})
createEndpointSlice(t, s.KubeClient(), "slice2", "service", namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{"1.2.3.4"})
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"1.2.3.4:80"})
xdsUpdater.WaitOrFail(t, "svcupdate")
// Trigger a service updates
makeService(t, s.KubeClient(), &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "service",
Namespace: namespace,
Labels: map[string]string{"foo": "bar"},
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{
Name: "http",
Port: 80,
}, {
Name: "http-other",
Port: 90,
}},
Selector: labels,
ClusterIP: "9.9.9.9",
},
})
xdsUpdater.WaitOrFail(t, "svcupdate")
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"1.2.3.4:80"})
})
}
}
func TestSameIPEndpointSlicing(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{
KubernetesEndpointMode: kubecontroller.EndpointSliceOnly,
EnableFakeXDSUpdater: true,
})
namespace := "namespace"
labels := map[string]string{
"app": "bar",
}
makeService(t, s.KubeClient(), &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "service",
Namespace: namespace,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{
Name: "http",
Port: 80,
}, {
Name: "http-other",
Port: 90,
}},
Selector: labels,
ClusterIP: "9.9.9.9",
},
})
xdsUpdater := s.XdsUpdater.(*xds.FakeXdsUpdater)
// Delete endpoints with same IP
createEndpointSlice(t, s.KubeClient(), "slice1", "service", namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{"1.2.3.4"})
createEndpointSlice(t, s.KubeClient(), "slice2", "service", namespace, []v1.EndpointPort{{Name: "http", Port: 80}}, []string{"1.2.3.4"})
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"1.2.3.4:80"})
// delete slice 1, it should still exist
_ = s.KubeClient().DiscoveryV1().EndpointSlices(namespace).Delete(context.TODO(), "slice1", metav1.DeleteOptions{})
xdsUpdater.WaitOrFail(t, "eds")
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", []string{"1.2.3.4:80"})
_ = s.KubeClient().DiscoveryV1().EndpointSlices(namespace).Delete(context.TODO(), "slice2", metav1.DeleteOptions{})
xdsUpdater.WaitOrFail(t, "eds")
expectEndpoints(t, s, "outbound|80||service.namespace.svc.cluster.local", nil)
}
type ServiceInstanceResponse struct {
Hostname host.Name
Namestring string
Address string
Port uint32
}
func expectEndpoints(t *testing.T, s *xds.FakeDiscoveryServer, cluster string, expected []string) {
t.Helper()
retry.UntilSuccessOrFail(t, func() error {
got := xdstest.ExtractLoadAssignments(s.Endpoints(s.SetupProxy(nil)))
sort.Strings(got[cluster])
sort.Strings(expected)
if !reflect.DeepEqual(got[cluster], expected) {
return fmt.Errorf("wanted %v got %v. All endpoints: %+v", expected, got[cluster], got)
}
return nil
}, retry.Converge(2), retry.Timeout(time.Second*2), retry.Delay(time.Millisecond*10))
}
// nolint: unparam
func expectServiceInstances(t *testing.T, sd serviceregistry.Instance, svc *model.Service, port int, expected []ServiceInstanceResponse) {
t.Helper()
svc.Attributes.ServiceRegistry = sd.Provider()
// The system is eventually consistent, so add some retries
retry.UntilSuccessOrFail(t, func() error {
instances := sd.InstancesByPort(svc, port, nil)
sortServiceInstances(instances)
got := []ServiceInstanceResponse{}
for _, i := range instances {
got = append(got, ServiceInstanceResponse{
Hostname: i.Service.Hostname,
Namestring: i.Service.Attributes.Namespace,
Address: i.Endpoint.Address,
Port: i.Endpoint.EndpointPort,
})
}
if err := compare(t, got, expected); err != nil {
return fmt.Errorf("%v", err)
}
return nil
}, retry.Converge(2), retry.Timeout(time.Second*2), retry.Delay(time.Millisecond*10))
}
func compare(t *testing.T, actual, expected interface{}) error {
return util.Compare(jsonBytes(t, actual), jsonBytes(t, expected))
}
func sortServiceInstances(instances []*model.ServiceInstance) {
sort.Slice(instances, func(i, j int) bool {
if instances[i].Service.Hostname == instances[j].Service.Hostname {
return instances[i].Endpoint.Address < instances[j].Endpoint.Address
}
return instances[i].Service.Hostname < instances[j].Service.Hostname
})
}
func jsonBytes(t *testing.T, v interface{}) []byte {
data, err := json.MarshalIndent(v, "", " ")
if err != nil {
t.Fatal(t)
}
return data
}
func setPodReady(pod *v1.Pod) {
pod.Status.Conditions = []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
},
}
}
func setPodUnready(pod *v1.Pod) {
pod.Status.Conditions = []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionFalse,
LastTransitionTime: metav1.Now(),
},
}
}
func makePod(t *testing.T, c kubernetes.Interface, pod *v1.Pod) {
t.Helper()
newPod, err := c.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, metav1.CreateOptions{})
if kerrors.IsAlreadyExists(err) {
newPod, err = c.CoreV1().Pods(pod.Namespace).Update(context.Background(), pod, metav1.UpdateOptions{})
}
if err != nil {
t.Fatal(err)
}
// Apiserver doesn't allow Create/Update to modify the pod status. Creating doesn't result in
// events - since PodIP will be "".
newPod.Status.PodIP = pod.Status.PodIP
newPod.Status.Phase = v1.PodRunning
// Also need to sets the pod to be ready as now we only add pod into service entry endpoint when it's ready
setPodReady(newPod)
_, err = c.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), newPod, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
}
func makeService(t *testing.T, c kubernetes.Interface, svc *v1.Service) {
t.Helper()
// avoid mutating input
svc = svc.DeepCopy()
// simulate actual k8s behavior
for i, port := range svc.Spec.Ports {
if port.TargetPort.IntVal == 0 && port.TargetPort.StrVal == "" {
svc.Spec.Ports[i].TargetPort.IntVal = port.Port
}
}
_, err := c.CoreV1().Services(svc.Namespace).Create(context.Background(), svc, metav1.CreateOptions{})
if kerrors.IsAlreadyExists(err) {
_, err = c.CoreV1().Services(svc.Namespace).Update(context.Background(), svc, metav1.UpdateOptions{})
}
if err != nil {
t.Fatal(err)
}
}
func makeIstioObject(t *testing.T, c model.ConfigStore, svc config.Config) {
t.Helper()
_, err := c.Create(svc)
if err != nil && err.Error() == "item already exists" {
_, err = c.Update(svc)
}
if err != nil {
t.Fatal(err)
}
}
func createEndpoints(t *testing.T, c kubernetes.Interface, name, namespace string, ports []v1.EndpointPort, ips []string) {
eas := make([]v1.EndpointAddress, 0)
for _, ip := range ips {
eas = append(eas, v1.EndpointAddress{IP: ip, TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod",
Namespace: namespace,
}})
}
endpoint := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Subsets: []v1.EndpointSubset{{
Addresses: eas,
Ports: ports,
}},
}
if _, err := c.CoreV1().Endpoints(namespace).Create(context.TODO(), endpoint, metav1.CreateOptions{}); err != nil {
t.Fatalf("failed to create endpoints %s in namespace %s (error %v)", name, namespace, err)
}
}
// nolint: unparam
func createEndpointSlice(t *testing.T, c kubernetes.Interface, name, serviceName, namespace string, ports []v1.EndpointPort, addrs []string) {
createEndpointSliceWithType(t, c, name, serviceName, namespace, ports, addrs, discovery.AddressTypeIPv4)
}
// nolint: unparam
func createEndpointSliceWithType(t *testing.T, c kubernetes.Interface, name, serviceName, namespace string,
ports []v1.EndpointPort, ips []string, addrType discovery.AddressType) {
esps := make([]discovery.EndpointPort, 0)
for _, name := range ports {
n := name // Create a stable reference to take the pointer from
esps = append(esps, discovery.EndpointPort{
Name: &n.Name,
Protocol: &n.Protocol,
Port: &n.Port,
AppProtocol: n.AppProtocol,
})
}
sliceEndpoint := []discovery.Endpoint{}
for _, ip := range ips {
sliceEndpoint = append(sliceEndpoint, discovery.Endpoint{
Addresses: []string{ip},
})
}
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
discovery.LabelServiceName: serviceName,
},
},
AddressType: addrType,
Endpoints: sliceEndpoint,
Ports: esps,
}
if _, err := c.DiscoveryV1().EndpointSlices(namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{}); err != nil {
if kerrors.IsAlreadyExists(err) {
_, err = c.DiscoveryV1().EndpointSlices(namespace).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
}
if err != nil {
t.Fatalf("failed to create endpoint slice %s in namespace %s (error %v)", name, namespace, err)
}
}
}