blob: 45ae921715f3315204d84601132f72de91050ba2 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
"context"
"fmt"
"net"
"reflect"
"sort"
"strconv"
"sync"
"testing"
"time"
)
import (
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"github.com/google/go-cmp/cmp"
"istio.io/api/annotation"
"istio.io/api/label"
meshconfig "istio.io/api/mesh/v1alpha1"
coreV1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/errors"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube/controller/filter"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/provider"
"github.com/apache/dubbo-go-pixiu/pkg/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/config/labels"
"github.com/apache/dubbo-go-pixiu/pkg/config/mesh"
"github.com/apache/dubbo-go-pixiu/pkg/config/protocol"
kubelib "github.com/apache/dubbo-go-pixiu/pkg/kube"
"github.com/apache/dubbo-go-pixiu/pkg/network"
"github.com/apache/dubbo-go-pixiu/pkg/spiffe"
"github.com/apache/dubbo-go-pixiu/pkg/test"
"github.com/apache/dubbo-go-pixiu/pkg/test/util/retry"
)
const (
testService = "test"
)
// eventually polls cond until it completes (returns true) or times out (resulting in a test failure).
func eventually(t test.Failer, cond func() bool) {
t.Helper()
retry.UntilSuccessOrFail(t, func() error {
if !cond() {
return fmt.Errorf("failed to get positive condition")
}
return nil
}, retry.Timeout(time.Second), retry.Delay(time.Millisecond*10))
}
func TestServices(t *testing.T) {
networksWatcher := mesh.NewFixedNetworksWatcher(&meshconfig.MeshNetworks{
Networks: map[string]*meshconfig.Network{
"network1": {
Endpoints: []*meshconfig.Network_NetworkEndpoints{
{
Ne: &meshconfig.Network_NetworkEndpoints_FromCidr{
FromCidr: "10.10.1.1/24",
},
},
},
},
"network2": {
Endpoints: []*meshconfig.Network_NetworkEndpoints{
{
Ne: &meshconfig.Network_NetworkEndpoints_FromCidr{
FromCidr: "10.11.1.1/24",
},
},
},
},
},
})
for mode, name := range EndpointModeNames {
mode := mode
t.Run(name, func(t *testing.T) {
ctl, fx := NewFakeControllerWithOptions(FakeControllerOptions{NetworksWatcher: networksWatcher, Mode: mode})
go ctl.Run(ctl.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(ctl.stop, ctl.HasSynced)
defer ctl.Stop()
t.Parallel()
ns := "ns-test"
hostname := kube.ServiceHostname(testService, ns, defaultFakeDomainSuffix)
var sds model.ServiceDiscovery = ctl
// "test", ports: http-example on 80
makeService(testService, ns, ctl.client, t)
<-fx.Events
eventually(t, func() bool {
out := sds.Services()
// Original test was checking for 'protocolTCP' - which is incorrect (the
// port name is 'http'. It was working because the Service was created with
// an invalid protocol, and the code was ignoring that ( not TCP/UDP).
for _, item := range out {
if item.Hostname == hostname &&
len(item.Ports) == 1 &&
item.Ports[0].Protocol == protocol.HTTP {
return true
}
}
return false
})
// 2 ports 1001, 2 IPs
createEndpoints(t, ctl, testService, ns, []string{"http-example", "foo"}, []string{"10.10.1.1", "10.11.1.2"}, nil, nil)
svc := sds.GetService(hostname)
if svc == nil {
t.Fatalf("GetService(%q) => should exists", hostname)
}
if svc.Hostname != hostname {
t.Fatalf("GetService(%q) => %q", hostname, svc.Hostname)
}
eventually(t, func() bool {
ep := sds.InstancesByPort(svc, 80, nil)
return len(ep) == 2
})
ep := sds.InstancesByPort(svc, 80, nil)
if len(ep) != 2 {
t.Fatalf("Invalid response for GetInstancesByPort %v", ep)
}
if ep[0].Endpoint.Address == "10.10.1.1" && ep[0].Endpoint.Network != "network1" {
t.Fatalf("Endpoint with IP 10.10.1.1 is expected to be in network1 but get: %s", ep[0].Endpoint.Network)
}
if ep[1].Endpoint.Address == "10.11.1.2" && ep[1].Endpoint.Network != "network2" {
t.Fatalf("Endpoint with IP 10.11.1.2 is expected to be in network2 but get: %s", ep[1].Endpoint.Network)
}
missing := kube.ServiceHostname("does-not-exist", ns, defaultFakeDomainSuffix)
svc = sds.GetService(missing)
if svc != nil {
t.Fatalf("GetService(%q) => %s, should not exist", missing, svc.Hostname)
}
})
}
}
func makeService(n, ns string, cl kubernetes.Interface, t *testing.T) {
_, err := cl.CoreV1().Services(ns).Create(context.TODO(), &coreV1.Service{
ObjectMeta: metaV1.ObjectMeta{Name: n},
Spec: coreV1.ServiceSpec{
Ports: []coreV1.ServicePort{
{
Port: 80,
Name: "http-example",
Protocol: coreV1.ProtocolTCP, // Not added automatically by fake
},
},
},
}, metaV1.CreateOptions{})
if err != nil {
t.Log("Service already created (rerunning test)")
}
log.Infof("Created service %s", n)
}
func TestController_GetPodLocality(t *testing.T) {
pod1 := generatePod("128.0.1.1", "pod1", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
pod2 := generatePod("128.0.1.2", "pod2", "nsB", "", "node2", map[string]string{"app": "prod-app"}, map[string]string{})
podOverride := generatePod("128.0.1.2", "pod2", "nsB", "",
"node1", map[string]string{"app": "prod-app", model.LocalityLabel: "regionOverride.zoneOverride.subzoneOverride"}, map[string]string{})
testCases := []struct {
name string
pods []*coreV1.Pod
nodes []*coreV1.Node
wantAZ map[*coreV1.Pod]string
}{
{
name: "should return correct az for given address",
pods: []*coreV1.Pod{pod1, pod2},
nodes: []*coreV1.Node{
generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"}),
generateNode("node2", map[string]string{NodeZoneLabel: "zone2", NodeRegionLabel: "region2", label.TopologySubzone.Name: "subzone2"}),
},
wantAZ: map[*coreV1.Pod]string{
pod1: "region1/zone1/subzone1",
pod2: "region2/zone2/subzone2",
},
},
{
name: "should return correct az for given address",
pods: []*coreV1.Pod{pod1, pod2},
nodes: []*coreV1.Node{
generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1"}),
generateNode("node2", map[string]string{NodeZoneLabel: "zone2", NodeRegionLabel: "region2"}),
},
wantAZ: map[*coreV1.Pod]string{
pod1: "region1/zone1/",
pod2: "region2/zone2/",
},
},
{
name: "should return false if pod isn't in the cache",
wantAZ: map[*coreV1.Pod]string{
pod1: "",
pod2: "",
},
},
{
name: "should return false if node isn't in the cache",
pods: []*coreV1.Pod{pod1, pod2},
wantAZ: map[*coreV1.Pod]string{
pod1: "",
pod2: "",
},
},
{
name: "should return correct az if node has only region label",
pods: []*coreV1.Pod{pod1, pod2},
nodes: []*coreV1.Node{
generateNode("node1", map[string]string{NodeRegionLabel: "region1"}),
generateNode("node2", map[string]string{NodeRegionLabel: "region2"}),
},
wantAZ: map[*coreV1.Pod]string{
pod1: "region1//",
pod2: "region2//",
},
},
{
name: "should return correct az if node has only zone label",
pods: []*coreV1.Pod{pod1, pod2},
nodes: []*coreV1.Node{
generateNode("node1", map[string]string{NodeZoneLabel: "zone1"}),
generateNode("node2", map[string]string{NodeZoneLabel: "zone2"}),
},
wantAZ: map[*coreV1.Pod]string{
pod1: "/zone1/",
pod2: "/zone2/",
},
},
{
name: "should return correct az if node has only subzone label",
pods: []*coreV1.Pod{pod1, pod2},
nodes: []*coreV1.Node{
generateNode("node1", map[string]string{label.TopologySubzone.Name: "subzone1"}),
generateNode("node2", map[string]string{label.TopologySubzone.Name: "subzone2"}),
},
wantAZ: map[*coreV1.Pod]string{
pod1: "//subzone1",
pod2: "//subzone2",
},
},
{
name: "should return correct az for given address",
pods: []*coreV1.Pod{podOverride},
nodes: []*coreV1.Node{
generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"}),
},
wantAZ: map[*coreV1.Pod]string{
podOverride: "regionOverride/zoneOverride/subzoneOverride",
},
},
}
for _, tc := range testCases {
// If using t.Parallel() you must copy the iteration to a new local variable
// https://github.com/golang/go/wiki/CommonMistakes#using-goroutines-on-loop-iterator-variables
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
// Setup kube caches
// Pod locality only matters for Endpoints
controller, fx := NewFakeControllerWithOptions(FakeControllerOptions{Mode: EndpointsOnly})
go controller.Run(controller.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(controller.stop, controller.HasSynced)
defer controller.Stop()
addNodes(t, controller, tc.nodes...)
addPods(t, controller, fx, tc.pods...)
// Verify expected existing pod AZs
for pod, wantAZ := range tc.wantAZ {
az := controller.getPodLocality(pod)
if wantAZ != "" {
if !reflect.DeepEqual(az, wantAZ) {
t.Fatalf("Wanted az: %s, got: %s", wantAZ, az)
}
} else {
if az != "" {
t.Fatalf("Unexpectedly found az: %s for pod: %s", az, pod.ObjectMeta.Name)
}
}
}
})
}
}
func TestGetProxyServiceInstances(t *testing.T) {
clusterID := cluster.ID("fakeCluster")
networkID := network.ID("fakeNetwork")
for mode, name := range EndpointModeNames {
mode := mode
t.Run(name, func(t *testing.T) {
controller, fx := NewFakeControllerWithOptions(FakeControllerOptions{
Mode: mode,
ClusterID: clusterID,
})
// add a network ID to test endpoints include topology.istio.io/network label
controller.network = networkID
go controller.Run(controller.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(controller.stop, controller.HasSynced)
defer controller.Stop()
p := generatePod("128.0.0.1", "pod1", "nsa", "foo", "node1", map[string]string{"app": "test-app"}, map[string]string{})
addPods(t, controller, fx, p)
k8sSaOnVM := "acct4"
canonicalSaOnVM := "acctvm2@gserviceaccount2.com"
createService(controller, "svc1", "nsa",
map[string]string{
annotation.AlphaKubernetesServiceAccounts.Name: k8sSaOnVM,
annotation.AlphaCanonicalServiceAccounts.Name: canonicalSaOnVM,
},
[]int32{8080}, map[string]string{"app": "prod-app"}, t)
ev := fx.Wait("service")
if ev == nil {
t.Fatal("Timeout creating service")
}
// Endpoints are generated by Kubernetes from pod labels and service selectors.
// Here we manually create them for mocking purpose.
svc1Ips := []string{"128.0.0.1"}
portNames := []string{"tcp-port"}
// Create 1 endpoint that refers to a pod in the same namespace.
createEndpoints(t, controller, "svc1", "nsA", portNames, svc1Ips, nil, nil)
// Creates 100 endpoints that refers to a pod in a different namespace.
fakeSvcCounts := 100
for i := 0; i < fakeSvcCounts; i++ {
svcName := fmt.Sprintf("svc-fake-%d", i)
createService(controller, svcName, "nsfake",
map[string]string{
annotation.AlphaKubernetesServiceAccounts.Name: k8sSaOnVM,
annotation.AlphaCanonicalServiceAccounts.Name: canonicalSaOnVM,
},
[]int32{8080}, map[string]string{"app": "prod-app"}, t)
fx.Wait("service")
createEndpoints(t, controller, svcName, "nsfake", portNames, svc1Ips, nil, nil)
fx.Wait("eds")
}
// Create 1 endpoint that refers to a pod in the same namespace.
createEndpoints(t, controller, "svc1", "nsa", portNames, svc1Ips, nil, nil)
fx.Wait("eds")
// this can test get pod by proxy ID
svcNode := &model.Proxy{
Type: model.Router,
IPAddresses: []string{"128.0.0.1"},
ID: "pod1.nsa",
DNSDomain: "nsa.svc.cluster.local",
Metadata: &model.NodeMetadata{Namespace: "nsa", ClusterID: clusterID},
}
serviceInstances := controller.GetProxyServiceInstances(svcNode)
if len(serviceInstances) != 1 {
t.Fatalf("GetProxyServiceInstances() expected 1 instance, got %d", len(serviceInstances))
}
hostname := kube.ServiceHostname("svc1", "nsa", defaultFakeDomainSuffix)
if serviceInstances[0].Service.Hostname != hostname {
t.Fatalf("GetProxyServiceInstances() wrong service instance returned => hostname %q, want %q",
serviceInstances[0].Service.Hostname, hostname)
}
// Test that we can look up instances just by Proxy metadata
metaServices := controller.GetProxyServiceInstances(&model.Proxy{
Type: "sidecar",
IPAddresses: []string{"1.1.1.1"},
Locality: &core.Locality{Region: "r", Zone: "z"},
ConfigNamespace: "nsa",
Metadata: &model.NodeMetadata{
ServiceAccount: "account",
ClusterID: clusterID,
Labels: map[string]string{
"app": "prod-app",
label.SecurityTlsMode.Name: "mutual",
},
},
})
expected := &model.ServiceInstance{
Service: &model.Service{
Hostname: "svc1.nsa.svc.company.com",
ClusterVIPs: model.AddressMap{
Addresses: map[cluster.ID][]string{clusterID: {"10.0.0.1"}},
},
DefaultAddress: "10.0.0.1",
Ports: []*model.Port{{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP}},
ServiceAccounts: []string{"acctvm2@gserviceaccount2.com", "spiffe://cluster.local/ns/nsa/sa/acct4"},
Attributes: model.ServiceAttributes{
ServiceRegistry: provider.Kubernetes,
Name: "svc1",
Namespace: "nsa",
LabelSelectors: map[string]string{"app": "prod-app"},
},
},
ServicePort: &model.Port{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP},
Endpoint: &model.IstioEndpoint{
Labels: labels.Instance{
"app": "prod-app",
label.SecurityTlsMode.Name: "mutual",
NodeRegionLabelGA: "r",
NodeZoneLabelGA: "z",
label.TopologyCluster.Name: clusterID.String(),
label.TopologyNetwork.Name: networkID.String(),
},
ServiceAccount: "account",
Address: "1.1.1.1",
Network: networkID,
EndpointPort: 0,
ServicePortName: "tcp-port",
Locality: model.Locality{
Label: "r/z",
ClusterID: clusterID,
},
TLSMode: "mutual",
},
}
if len(metaServices) != 1 {
t.Fatalf("expected 1 instance, got %v", len(metaServices))
}
// Remove the discoverability function so that it's ignored by DeepEqual.
clearDiscoverabilityPolicy(metaServices[0].Endpoint)
if !reflect.DeepEqual(expected, metaServices[0]) {
t.Fatalf("expected instance %v, got %v", expected, metaServices[0])
}
// Test that we first look up instances by Proxy pod
node := generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"})
addNodes(t, controller, node)
// 1. pod without `istio-locality` label, get locality from node label.
p = generatePod("129.0.0.1", "pod2", "nsa", "svcaccount", "node1",
map[string]string{"app": "prod-app"}, nil)
addPods(t, controller, fx, p)
// this can test get pod by proxy ip address
podServices := controller.GetProxyServiceInstances(&model.Proxy{
Type: "sidecar",
IPAddresses: []string{"129.0.0.1"},
Locality: &core.Locality{Region: "r", Zone: "z"},
ConfigNamespace: "nsa",
Metadata: &model.NodeMetadata{
ServiceAccount: "account",
ClusterID: clusterID,
Labels: map[string]string{
"app": "prod-app",
},
},
})
expected = &model.ServiceInstance{
Service: &model.Service{
Hostname: "svc1.nsa.svc.company.com",
ClusterVIPs: model.AddressMap{
Addresses: map[cluster.ID][]string{clusterID: {"10.0.0.1"}},
},
DefaultAddress: "10.0.0.1",
Ports: []*model.Port{{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP}},
ServiceAccounts: []string{"acctvm2@gserviceaccount2.com", "spiffe://cluster.local/ns/nsa/sa/acct4"},
Attributes: model.ServiceAttributes{
ServiceRegistry: provider.Kubernetes,
Name: "svc1",
Namespace: "nsa",
LabelSelectors: map[string]string{"app": "prod-app"},
},
},
ServicePort: &model.Port{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP},
Endpoint: &model.IstioEndpoint{
Address: "129.0.0.1",
Network: networkID,
EndpointPort: 0,
ServicePortName: "tcp-port",
Locality: model.Locality{
Label: "region1/zone1/subzone1",
ClusterID: clusterID,
},
Labels: labels.Instance{
"app": "prod-app",
NodeRegionLabelGA: "region1",
NodeZoneLabelGA: "zone1",
label.TopologySubzone.Name: "subzone1",
label.TopologyCluster.Name: clusterID.String(),
label.TopologyNetwork.Name: networkID.String(),
},
ServiceAccount: "spiffe://cluster.local/ns/nsa/sa/svcaccount",
TLSMode: model.DisabledTLSModeLabel,
WorkloadName: "pod2",
Namespace: "nsa",
},
}
if len(podServices) != 1 {
t.Fatalf("expected 1 instance, got %v", len(podServices))
}
clearDiscoverabilityPolicy(podServices[0].Endpoint)
if !reflect.DeepEqual(expected, podServices[0]) {
t.Fatalf("expected instance %v, got %v", expected, podServices[0])
}
// 2. pod with `istio-locality` label, ignore node label.
p = generatePod("129.0.0.2", "pod3", "nsa", "svcaccount", "node1",
map[string]string{"app": "prod-app", "istio-locality": "region.zone"}, nil)
addPods(t, controller, fx, p)
// this can test get pod by proxy ip address
podServices = controller.GetProxyServiceInstances(&model.Proxy{
Type: "sidecar",
IPAddresses: []string{"129.0.0.2"},
Locality: &core.Locality{Region: "r", Zone: "z"},
ConfigNamespace: "nsa",
Metadata: &model.NodeMetadata{
ServiceAccount: "account",
ClusterID: clusterID,
Labels: map[string]string{
"app": "prod-app",
},
},
})
expected = &model.ServiceInstance{
Service: &model.Service{
Hostname: "svc1.nsa.svc.company.com",
ClusterVIPs: model.AddressMap{
Addresses: map[cluster.ID][]string{clusterID: {"10.0.0.1"}},
},
DefaultAddress: "10.0.0.1",
Ports: []*model.Port{{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP}},
ServiceAccounts: []string{"acctvm2@gserviceaccount2.com", "spiffe://cluster.local/ns/nsa/sa/acct4"},
Attributes: model.ServiceAttributes{
ServiceRegistry: provider.Kubernetes,
Name: "svc1",
Namespace: "nsa",
LabelSelectors: map[string]string{"app": "prod-app"},
},
},
ServicePort: &model.Port{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP},
Endpoint: &model.IstioEndpoint{
Address: "129.0.0.2",
Network: networkID,
EndpointPort: 0,
ServicePortName: "tcp-port",
Locality: model.Locality{
Label: "region/zone",
ClusterID: clusterID,
},
Labels: labels.Instance{
"app": "prod-app",
"istio-locality": "region.zone",
NodeRegionLabelGA: "region",
NodeZoneLabelGA: "zone",
label.TopologyCluster.Name: clusterID.String(),
label.TopologyNetwork.Name: networkID.String(),
},
ServiceAccount: "spiffe://cluster.local/ns/nsa/sa/svcaccount",
TLSMode: model.DisabledTLSModeLabel,
WorkloadName: "pod3",
Namespace: "nsa",
},
}
if len(podServices) != 1 {
t.Fatalf("expected 1 instance, got %v", len(podServices))
}
clearDiscoverabilityPolicy(podServices[0].Endpoint)
if !reflect.DeepEqual(expected, podServices[0]) {
t.Fatalf("expected instance %v, got %v", expected, podServices[0])
}
})
}
}
func TestGetProxyServiceInstancesWithMultiIPsAndTargetPorts(t *testing.T) {
pod1 := generatePod("128.0.0.1", "pod1", "nsa", "foo", "node1", map[string]string{"app": "test-app"}, map[string]string{})
testCases := []struct {
name string
pods []*coreV1.Pod
ips []string
ports []coreV1.ServicePort
wantEndpoints []model.IstioEndpoint
}{
{
name: "multiple proxy ips single port",
pods: []*coreV1.Pod{pod1},
ips: []string{"128.0.0.1", "192.168.2.6"},
ports: []coreV1.ServicePort{
{
Name: "tcp-port",
Port: 8080,
Protocol: "http",
TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
},
},
wantEndpoints: []model.IstioEndpoint{
{
Address: "128.0.0.1",
ServicePortName: "tcp-port",
EndpointPort: 8080,
},
{
Address: "192.168.2.6",
ServicePortName: "tcp-port",
EndpointPort: 8080,
},
},
},
{
name: "single proxy ip single port",
pods: []*coreV1.Pod{pod1},
ips: []string{"128.0.0.1"},
ports: []coreV1.ServicePort{
{
Name: "tcp-port",
Port: 8080,
Protocol: "TCP",
TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
},
},
wantEndpoints: []model.IstioEndpoint{
{
Address: "128.0.0.1",
ServicePortName: "tcp-port",
EndpointPort: 8080,
},
},
},
{
name: "multiple proxy ips multiple ports",
pods: []*coreV1.Pod{pod1},
ips: []string{"128.0.0.1", "192.168.2.6"},
ports: []coreV1.ServicePort{
{
Name: "tcp-port-1",
Port: 8080,
Protocol: "http",
TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
},
{
Name: "tcp-port-2",
Port: 9090,
Protocol: "http",
TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 9090},
},
},
wantEndpoints: []model.IstioEndpoint{
{
Address: "128.0.0.1",
ServicePortName: "tcp-port-1",
EndpointPort: 8080,
},
{
Address: "192.168.2.6",
ServicePortName: "tcp-port-1",
EndpointPort: 8080,
},
{
Address: "128.0.0.1",
ServicePortName: "tcp-port-2",
EndpointPort: 9090,
},
{
Address: "192.168.2.6",
ServicePortName: "tcp-port-2",
EndpointPort: 9090,
},
},
},
{
name: "single proxy ip multiple ports same target port with different protocols",
pods: []*coreV1.Pod{pod1},
ips: []string{"128.0.0.1"},
ports: []coreV1.ServicePort{
{
Name: "tcp-port",
Port: 8080,
Protocol: "TCP",
TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
},
{
Name: "http-port",
Port: 9090,
Protocol: "TCP",
TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
},
},
wantEndpoints: []model.IstioEndpoint{
{
Address: "128.0.0.1",
ServicePortName: "tcp-port",
EndpointPort: 8080,
},
{
Address: "128.0.0.1",
ServicePortName: "http-port",
EndpointPort: 8080,
},
},
},
{
name: "single proxy ip multiple ports same target port with overlapping protocols",
pods: []*coreV1.Pod{pod1},
ips: []string{"128.0.0.1"},
ports: []coreV1.ServicePort{
{
Name: "http-7442",
Port: 7442,
Protocol: "TCP",
TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 7442},
},
{
Name: "tcp-8443",
Port: 8443,
Protocol: "TCP",
TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 7442},
},
{
Name: "http-7557",
Port: 7557,
Protocol: "TCP",
TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 7442},
},
},
wantEndpoints: []model.IstioEndpoint{
{
Address: "128.0.0.1",
ServicePortName: "http-7442",
EndpointPort: 7442,
},
{
Address: "128.0.0.1",
ServicePortName: "tcp-8443",
EndpointPort: 7442,
},
},
},
{
name: "single proxy ip multiple ports",
pods: []*coreV1.Pod{pod1},
ips: []string{"128.0.0.1"},
ports: []coreV1.ServicePort{
{
Name: "tcp-port",
Port: 8080,
Protocol: "TCP",
TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
},
{
Name: "http-port",
Port: 9090,
Protocol: "TCP",
TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 9090},
},
},
wantEndpoints: []model.IstioEndpoint{
{
Address: "128.0.0.1",
ServicePortName: "tcp-port",
EndpointPort: 8080,
},
{
Address: "128.0.0.1",
ServicePortName: "http-port",
EndpointPort: 9090,
},
},
},
}
for _, c := range testCases {
for mode, name := range EndpointModeNames {
mode := mode
t.Run(fmt.Sprintf("%s_%s", c.name, name), func(t *testing.T) {
// Setup kube caches
controller, fx := NewFakeControllerWithOptions(FakeControllerOptions{Mode: mode})
go controller.Run(controller.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(controller.stop, controller.HasSynced)
defer controller.Stop()
addPods(t, controller, fx, c.pods...)
createServiceWithTargetPorts(controller, "svc1", "nsa",
map[string]string{
annotation.AlphaKubernetesServiceAccounts.Name: "acct4",
annotation.AlphaCanonicalServiceAccounts.Name: "acctvm2@gserviceaccount2.com",
},
c.ports, map[string]string{"app": "test-app"}, t)
ev := fx.Wait("service")
if ev == nil {
t.Fatal("Timeout creating service")
}
serviceInstances := controller.GetProxyServiceInstances(&model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: c.ips})
for i, svc := range serviceInstances {
if svc.Endpoint.Address != c.wantEndpoints[i].Address {
t.Errorf("wrong endpoint address at #i endpoint, got %v want %v", svc.Endpoint.Address, c.wantEndpoints[i].Address)
}
if svc.Endpoint.EndpointPort != c.wantEndpoints[i].EndpointPort {
t.Errorf("wrong endpoint port at #i endpoint, got %v want %v", svc.Endpoint.EndpointPort, c.wantEndpoints[i].EndpointPort)
}
if svc.Endpoint.ServicePortName != c.wantEndpoints[i].ServicePortName {
t.Errorf("wrong svc port at #i endpoint, got %v want %v", svc.Endpoint.ServicePortName, c.wantEndpoints[i].ServicePortName)
}
}
})
}
}
}
func TestGetProxyServiceInstances_WorkloadInstance(t *testing.T) {
ctl, fx := NewFakeControllerWithOptions(FakeControllerOptions{})
go ctl.Run(ctl.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(ctl.stop, ctl.HasSynced)
defer ctl.Stop()
createService(ctl, "ratings", "bookinfo-ratings",
map[string]string{
annotation.AlphaKubernetesServiceAccounts.Name: "ratings",
annotation.AlphaCanonicalServiceAccounts.Name: "ratings@gserviceaccount2.com",
},
[]int32{8080}, map[string]string{"app": "ratings"}, t)
fx.WaitOrFail(t, "service")
createService(ctl, "details", "bookinfo-details",
map[string]string{
annotation.AlphaKubernetesServiceAccounts.Name: "details",
annotation.AlphaCanonicalServiceAccounts.Name: "details@gserviceaccount2.com",
},
[]int32{9090}, map[string]string{"app": "details"}, t)
fx.WaitOrFail(t, "service")
createService(ctl, "reviews", "bookinfo-reviews",
map[string]string{
annotation.AlphaKubernetesServiceAccounts.Name: "reviews",
annotation.AlphaCanonicalServiceAccounts.Name: "reviews@gserviceaccount2.com",
},
[]int32{7070}, map[string]string{"app": "reviews"}, t)
fx.WaitOrFail(t, "service")
wiRatings1 := &model.WorkloadInstance{
Name: "ratings-1",
Namespace: "bookinfo-ratings",
Endpoint: &model.IstioEndpoint{
Labels: labels.Instance{"app": "ratings"},
Address: "2.2.2.21",
EndpointPort: 8080,
},
}
wiDetails1 := &model.WorkloadInstance{
Name: "details-1",
Namespace: "bookinfo-details",
Endpoint: &model.IstioEndpoint{
Labels: labels.Instance{"app": "details"},
Address: "2.2.2.21",
EndpointPort: 9090,
},
}
wiReviews1 := &model.WorkloadInstance{
Name: "reviews-1",
Namespace: "bookinfo-reviews",
Endpoint: &model.IstioEndpoint{
Labels: labels.Instance{"app": "reviews"},
Address: "3.3.3.31",
EndpointPort: 7070,
},
}
wiReviews2 := &model.WorkloadInstance{
Name: "reviews-2",
Namespace: "bookinfo-reviews",
Endpoint: &model.IstioEndpoint{
Labels: labels.Instance{"app": "reviews"},
Address: "3.3.3.32",
EndpointPort: 7071,
},
}
wiProduct1 := &model.WorkloadInstance{
Name: "productpage-1",
Namespace: "bookinfo-productpage",
Endpoint: &model.IstioEndpoint{
Labels: labels.Instance{"app": "productpage"},
Address: "4.4.4.41",
EndpointPort: 6060,
},
}
for _, wi := range []*model.WorkloadInstance{wiRatings1, wiDetails1, wiReviews1, wiReviews2, wiProduct1} {
ctl.WorkloadInstanceHandler(wi, model.EventAdd) // simulate adding a workload entry
}
cases := []struct {
name string
proxy *model.Proxy
want []*model.ServiceInstance
}{
{
name: "proxy with unspecified IP",
proxy: &model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: nil},
want: nil,
},
{
name: "proxy with IP not in the registry",
proxy: &model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: []string{"1.1.1.1"}},
want: nil,
},
{
name: "proxy with IP from the registry, 1 matching WE, but no matching Service",
proxy: &model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: []string{"4.4.4.41"}},
want: nil,
},
{
name: "proxy with IP from the registry, 1 matching WE, and matching Service",
proxy: &model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: []string{"3.3.3.31"}},
want: []*model.ServiceInstance{{
Service: &model.Service{
Hostname: "reviews.bookinfo-reviews.svc.company.com",
},
Endpoint: &model.IstioEndpoint{
Labels: map[string]string{"app": "reviews"},
Address: "3.3.3.31",
ServicePortName: "tcp-port",
EndpointPort: 7070,
},
}},
},
{
name: "proxy with IP from the registry, 2 matching WE, and matching Service",
proxy: &model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: []string{"2.2.2.21"}},
want: []*model.ServiceInstance{{
Service: &model.Service{
Hostname: "details.bookinfo-details.svc.company.com",
},
Endpoint: &model.IstioEndpoint{
Labels: map[string]string{"app": "details"}, // should pick "details" because of ordering
Address: "2.2.2.21",
ServicePortName: "tcp-port",
EndpointPort: 9090,
},
}},
},
{
name: "proxy with IP from the registry, 2 matching WE, and matching Service, and proxy ID equal to WE with a different address",
proxy: &model.Proxy{
Metadata: &model.NodeMetadata{}, IPAddresses: []string{"2.2.2.21"},
ID: "reviews-1.bookinfo-reviews", ConfigNamespace: "bookinfo-reviews",
},
want: []*model.ServiceInstance{{
Service: &model.Service{
Hostname: "details.bookinfo-details.svc.company.com",
},
Endpoint: &model.IstioEndpoint{
Labels: map[string]string{"app": "details"}, // should pick "details" because of ordering
Address: "2.2.2.21",
ServicePortName: "tcp-port",
EndpointPort: 9090,
},
}},
},
{
name: "proxy with IP from the registry, 2 matching WE, and matching Service, and proxy ID equal to WE name, but proxy.ID != proxy.ConfigNamespace",
proxy: &model.Proxy{
Metadata: &model.NodeMetadata{}, IPAddresses: []string{"2.2.2.21"},
ID: "ratings-1.bookinfo-ratings", ConfigNamespace: "wrong-namespace",
},
want: []*model.ServiceInstance{{
Service: &model.Service{
Hostname: "details.bookinfo-details.svc.company.com",
},
Endpoint: &model.IstioEndpoint{
Labels: map[string]string{"app": "details"}, // should pick "details" because of ordering
Address: "2.2.2.21",
ServicePortName: "tcp-port",
EndpointPort: 9090,
},
}},
},
{
name: "proxy with IP from the registry, 2 matching WE, and matching Service, and proxy.ID == WE name",
proxy: &model.Proxy{
Metadata: &model.NodeMetadata{}, IPAddresses: []string{"2.2.2.21"},
ID: "ratings-1.bookinfo-ratings", ConfigNamespace: "bookinfo-ratings",
},
want: []*model.ServiceInstance{{
Service: &model.Service{
Hostname: "ratings.bookinfo-ratings.svc.company.com",
},
Endpoint: &model.IstioEndpoint{
Labels: map[string]string{"app": "ratings"}, // should pick "ratings"
Address: "2.2.2.21",
ServicePortName: "tcp-port",
EndpointPort: 8080,
},
}},
},
{
name: "proxy with IP from the registry, 2 matching WE, and matching Service, and proxy.ID != WE name, but proxy.ConfigNamespace == WE namespace",
proxy: &model.Proxy{
Metadata: &model.NodeMetadata{}, IPAddresses: []string{"2.2.2.21"},
ID: "wrong-name.bookinfo-ratings", ConfigNamespace: "bookinfo-ratings",
},
want: []*model.ServiceInstance{{
Service: &model.Service{
Hostname: "ratings.bookinfo-ratings.svc.company.com",
},
Endpoint: &model.IstioEndpoint{
Labels: map[string]string{"app": "ratings"}, // should pick "ratings"
Address: "2.2.2.21",
ServicePortName: "tcp-port",
EndpointPort: 8080,
},
}},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := ctl.GetProxyServiceInstances(tc.proxy)
if diff := cmp.Diff(len(tc.want), len(got)); diff != "" {
t.Fatalf("GetProxyServiceInstances() returned unexpected number of service instances (--want/++got): %v", diff)
}
for i := range tc.want {
if diff := cmp.Diff(tc.want[i].Service.Hostname, got[i].Service.Hostname); diff != "" {
t.Fatalf("GetProxyServiceInstances() returned unexpected value [%d].Service.Hostname (--want/++got): %v", i, diff)
}
if diff := cmp.Diff(tc.want[i].Endpoint, got[i].Endpoint); diff != "" {
t.Fatalf("GetProxyServiceInstances() returned unexpected value [%d].Endpoint (--want/++got): %v", i, diff)
}
}
})
}
}
func TestController_GetIstioServiceAccounts(t *testing.T) {
oldTrustDomain := spiffe.GetTrustDomain()
spiffe.SetTrustDomain(defaultFakeDomainSuffix)
defer spiffe.SetTrustDomain(oldTrustDomain)
for mode, name := range EndpointModeNames {
mode := mode
t.Run(name, func(t *testing.T) {
controller, fx := NewFakeControllerWithOptions(FakeControllerOptions{Mode: mode})
go controller.Run(controller.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(controller.stop, controller.HasSynced)
defer controller.Stop()
sa1 := "acct1"
sa2 := "acct2"
sa3 := "acct3"
k8sSaOnVM := "acct4"
canonicalSaOnVM := "acctvm@gserviceaccount.com"
pods := []*coreV1.Pod{
generatePod("128.0.0.1", "pod1", "nsA", sa1, "node1", map[string]string{"app": "test-app"}, map[string]string{}),
generatePod("128.0.0.2", "pod2", "nsA", sa2, "node2", map[string]string{"app": "prod-app"}, map[string]string{}),
generatePod("128.0.0.3", "pod3", "nsB", sa3, "node1", map[string]string{"app": "prod-app"}, map[string]string{}),
}
addPods(t, controller, fx, pods...)
createService(controller, "svc1", "nsA",
map[string]string{
annotation.AlphaKubernetesServiceAccounts.Name: k8sSaOnVM,
annotation.AlphaCanonicalServiceAccounts.Name: canonicalSaOnVM,
},
[]int32{8080}, map[string]string{"app": "prod-app"}, t)
fx.Wait("service")
createService(controller, "svc2", "nsA", nil, []int32{8080}, map[string]string{"app": "staging-app"}, t)
fx.Wait("service")
// Endpoints are generated by Kubernetes from pod labels and service selectors.
// Here we manually create them for mocking purpose.
svc1Ips := []string{"128.0.0.2"}
svc2Ips := make([]string, 0)
portNames := []string{"tcp-port"}
createEndpoints(t, controller, "svc1", "nsA", portNames, svc1Ips, nil, nil)
createEndpoints(t, controller, "svc2", "nsA", portNames, svc2Ips, nil, nil)
// We expect only one EDS update with Endpoints.
<-fx.Events
hostname := kube.ServiceHostname("svc1", "nsA", defaultFakeDomainSuffix)
svc := controller.GetService(hostname)
sa := controller.GetIstioServiceAccounts(svc, []int{8080})
sort.Strings(sa)
expected := []string{
canonicalSaOnVM,
"spiffe://company.com/ns/nsA/sa/" + sa2,
"spiffe://company.com/ns/nsA/sa/" + k8sSaOnVM,
}
if !reflect.DeepEqual(sa, expected) {
t.Fatalf("Unexpected service accounts %v (expecting %v)", sa, expected)
}
hostname = kube.ServiceHostname("svc2", "nsA", defaultFakeDomainSuffix)
svc = controller.GetService(hostname)
sa = controller.GetIstioServiceAccounts(svc, []int{})
if len(sa) != 0 {
t.Fatal("Failure: Expected to resolve 0 service accounts, but got: ", sa)
}
})
}
}
func TestController_Service(t *testing.T) {
for mode, name := range EndpointModeNames {
mode := mode
t.Run(name, func(t *testing.T) {
controller, fx := NewFakeControllerWithOptions(FakeControllerOptions{Mode: mode})
go controller.Run(controller.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(controller.stop, controller.HasSynced)
defer controller.Stop()
// Use a timeout to keep the test from hanging.
createService(controller, "svc1", "nsA",
map[string]string{},
[]int32{8080}, map[string]string{"test-app": "test-app-1"}, t)
<-fx.Events
createService(controller, "svc2", "nsA",
map[string]string{},
[]int32{8081}, map[string]string{"test-app": "test-app-2"}, t)
<-fx.Events
createService(controller, "svc3", "nsA",
map[string]string{},
[]int32{8082}, map[string]string{"test-app": "test-app-3"}, t)
<-fx.Events
createService(controller, "svc4", "nsA",
map[string]string{},
[]int32{8083}, map[string]string{"test-app": "test-app-4"}, t)
<-fx.Events
expectedSvcList := []*model.Service{
{
Hostname: kube.ServiceHostname("svc1", "nsA", defaultFakeDomainSuffix),
DefaultAddress: "10.0.0.1",
Ports: model.PortList{
&model.Port{
Name: "tcp-port",
Port: 8080,
Protocol: protocol.TCP,
},
},
},
{
Hostname: kube.ServiceHostname("svc2", "nsA", defaultFakeDomainSuffix),
DefaultAddress: "10.0.0.1",
Ports: model.PortList{
&model.Port{
Name: "tcp-port",
Port: 8081,
Protocol: protocol.TCP,
},
},
},
{
Hostname: kube.ServiceHostname("svc3", "nsA", defaultFakeDomainSuffix),
DefaultAddress: "10.0.0.1",
Ports: model.PortList{
&model.Port{
Name: "tcp-port",
Port: 8082,
Protocol: protocol.TCP,
},
},
},
{
Hostname: kube.ServiceHostname("svc4", "nsA", defaultFakeDomainSuffix),
DefaultAddress: "10.0.0.1",
Ports: model.PortList{
&model.Port{
Name: "tcp-port",
Port: 8083,
Protocol: protocol.TCP,
},
},
},
}
svcList := controller.Services()
servicesEqual(svcList, expectedSvcList)
})
}
}
func TestController_ServiceWithFixedDiscoveryNamespaces(t *testing.T) {
meshWatcher := mesh.NewFixedWatcher(&meshconfig.MeshConfig{
DiscoverySelectors: []*metaV1.LabelSelector{
{
MatchLabels: map[string]string{
"pilot-discovery": "enabled",
},
},
{
MatchExpressions: []metaV1.LabelSelectorRequirement{
{
Key: "env",
Operator: metaV1.LabelSelectorOpIn,
Values: []string{"test", "dev"},
},
},
},
},
})
svc1 := &model.Service{
Hostname: kube.ServiceHostname("svc1", "nsA", defaultFakeDomainSuffix),
DefaultAddress: "10.0.0.1",
Ports: model.PortList{
&model.Port{
Name: "tcp-port",
Port: 8080,
Protocol: protocol.TCP,
},
},
}
svc2 := &model.Service{
Hostname: kube.ServiceHostname("svc2", "nsA", defaultFakeDomainSuffix),
DefaultAddress: "10.0.0.1",
Ports: model.PortList{
&model.Port{
Name: "tcp-port",
Port: 8081,
Protocol: protocol.TCP,
},
},
}
svc3 := &model.Service{
Hostname: kube.ServiceHostname("svc3", "nsB", defaultFakeDomainSuffix),
DefaultAddress: "10.0.0.1",
Ports: model.PortList{
&model.Port{
Name: "tcp-port",
Port: 8082,
Protocol: protocol.TCP,
},
},
}
svc4 := &model.Service{
Hostname: kube.ServiceHostname("svc4", "nsB", defaultFakeDomainSuffix),
DefaultAddress: "10.0.0.1",
Ports: model.PortList{
&model.Port{
Name: "tcp-port",
Port: 8083,
Protocol: protocol.TCP,
},
},
}
for mode, name := range EndpointModeNames {
mode := mode
t.Run(name, func(t *testing.T) {
controller, fx := NewFakeControllerWithOptions(FakeControllerOptions{
Mode: mode,
MeshWatcher: meshWatcher,
})
go controller.Run(controller.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(controller.stop, controller.HasSynced)
defer controller.Stop()
nsA := "nsA"
nsB := "nsB"
// event handlers should only be triggered for services in namespaces selected for discovery
createNamespace(t, controller.client, nsA, map[string]string{"pilot-discovery": "enabled"})
createNamespace(t, controller.client, nsB, map[string]string{})
// wait for namespaces to be created
eventually(t, func() bool {
list, err := controller.client.CoreV1().Namespaces().List(context.TODO(), metaV1.ListOptions{})
if err != nil {
t.Fatalf("error listing namespaces: %v", err)
}
return len(list.Items) == 2
})
// service event handlers should trigger for svc1 and svc2
createService(controller, "svc1", nsA,
map[string]string{},
[]int32{8080}, map[string]string{"test-app": "test-app-1"}, t)
if ev := fx.Wait("service"); ev == nil {
t.Fatal("Timeout creating service")
}
createService(controller, "svc2", nsA,
map[string]string{},
[]int32{8081}, map[string]string{"test-app": "test-app-2"}, t)
if ev := fx.Wait("service"); ev == nil {
t.Fatal("Timeout creating service")
}
// service event handlers should not trigger for svc3 and svc4
createService(controller, "svc3", nsB,
map[string]string{},
[]int32{8082}, map[string]string{"test-app": "test-app-3"}, t)
createService(controller, "svc4", nsB,
map[string]string{},
[]int32{8083}, map[string]string{"test-app": "test-app-4"}, t)
expectedSvcList := []*model.Service{svc1, svc2}
eventually(t, func() bool {
svcList := controller.Services()
return servicesEqual(svcList, expectedSvcList)
})
// test updating namespace with adding discovery label
updateNamespace(t, controller.client, nsB, map[string]string{"env": "test"})
// service event handlers should trigger for svc3 and svc4
if ev := fx.Wait("service"); ev == nil {
t.Fatal("Timeout creating service")
}
if ev := fx.Wait("service"); ev == nil {
t.Fatal("Timeout creating service")
}
expectedSvcList = []*model.Service{svc1, svc2, svc3, svc4}
eventually(t, func() bool {
svcList := controller.Services()
return servicesEqual(svcList, expectedSvcList)
})
// test updating namespace by removing discovery label
updateNamespace(t, controller.client, nsA, map[string]string{"pilot-discovery": "disabled"})
// service event handlers should trigger for svc1 and svc2
if ev := fx.Wait("service"); ev == nil {
t.Fatal("Timeout creating service")
}
if ev := fx.Wait("service"); ev == nil {
t.Fatal("Timeout creating service")
}
expectedSvcList = []*model.Service{svc3, svc4}
eventually(t, func() bool {
svcList := controller.Services()
return servicesEqual(svcList, expectedSvcList)
})
})
}
}
func TestController_ServiceWithChangingDiscoveryNamespaces(t *testing.T) {
svc1 := &model.Service{
Hostname: kube.ServiceHostname("svc1", "nsA", defaultFakeDomainSuffix),
DefaultAddress: "10.0.0.1",
Ports: model.PortList{
&model.Port{
Name: "tcp-port",
Port: 8080,
Protocol: protocol.TCP,
},
},
}
svc2 := &model.Service{
Hostname: kube.ServiceHostname("svc2", "nsA", defaultFakeDomainSuffix),
DefaultAddress: "10.0.0.1",
Ports: model.PortList{
&model.Port{
Name: "tcp-port",
Port: 8081,
Protocol: protocol.TCP,
},
},
}
svc3 := &model.Service{
Hostname: kube.ServiceHostname("svc3", "nsB", defaultFakeDomainSuffix),
DefaultAddress: "10.0.0.1",
Ports: model.PortList{
&model.Port{
Name: "tcp-port",
Port: 8082,
Protocol: protocol.TCP,
},
},
}
svc4 := &model.Service{
Hostname: kube.ServiceHostname("svc4", "nsC", defaultFakeDomainSuffix),
DefaultAddress: "10.0.0.1",
Ports: model.PortList{
&model.Port{
Name: "tcp-port",
Port: 8083,
Protocol: protocol.TCP,
},
},
}
updateMeshConfig := func(
meshConfig *meshconfig.MeshConfig,
expectedSvcList []*model.Service,
expectedNumSvcEvents int,
testMeshWatcher *mesh.TestWatcher,
fx *FakeXdsUpdater,
controller *FakeController,
) {
// update meshConfig
if err := testMeshWatcher.Update(meshConfig, 5); err != nil {
t.Fatalf("%v", err)
}
// assert firing of service events
for i := 0; i < expectedNumSvcEvents; i++ {
if ev := fx.Wait("service"); ev == nil {
t.Fatal("timed out waiting for service event")
}
}
eventually(t, func() bool {
svcList := controller.Services()
return servicesEqual(svcList, expectedSvcList)
})
}
for mode, name := range EndpointModeNames {
mode := mode
t.Run(name, func(t *testing.T) {
client := kubelib.NewFakeClient()
meshWatcher := mesh.NewTestWatcher(&meshconfig.MeshConfig{})
discoveryNamespacesFilter := filter.NewDiscoveryNamespacesFilter(
client.KubeInformer().Core().V1().Namespaces().Lister(),
meshWatcher.Mesh().DiscoverySelectors,
)
controller, fx := NewFakeControllerWithOptions(FakeControllerOptions{
Client: client,
Mode: mode,
MeshWatcher: meshWatcher,
DiscoveryNamespacesFilter: discoveryNamespacesFilter,
})
go controller.Run(controller.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(controller.stop, controller.HasSynced)
defer controller.Stop()
nsA := "nsA"
nsB := "nsB"
nsC := "nsC"
createNamespace(t, controller.client, nsA, map[string]string{"app": "foo"})
createNamespace(t, controller.client, nsB, map[string]string{"app": "bar"})
createNamespace(t, controller.client, nsC, map[string]string{"app": "baz"})
// wait for namespaces to be created
eventually(t, func() bool {
list, err := controller.client.CoreV1().Namespaces().List(context.TODO(), metaV1.ListOptions{})
if err != nil {
t.Fatalf("error listing namespaces: %v", err)
}
return len(list.Items) == 3
})
// assert that namespace membership has been updated
eventually(t, func() bool {
members := discoveryNamespacesFilter.GetMembers()
return members.Has(nsA) && members.Has(nsB) && members.Has(nsC)
})
// service event handlers should trigger for all svcs
createService(controller, "svc1", nsA,
map[string]string{},
[]int32{8080}, map[string]string{"test-app": "test-app-1"}, t)
if ev := fx.Wait("service"); ev == nil {
t.Fatal("Timeout creating service")
}
createService(controller, "svc2", nsA,
map[string]string{},
[]int32{8081}, map[string]string{"test-app": "test-app-2"}, t)
if ev := fx.Wait("service"); ev == nil {
t.Fatal("Timeout creating service")
}
createService(controller, "svc3", nsB,
map[string]string{},
[]int32{8082}, map[string]string{"test-app": "test-app-3"}, t)
if ev := fx.Wait("service"); ev == nil {
t.Fatal("Timeout creating service")
}
createService(controller, "svc4", nsC,
map[string]string{},
[]int32{8083}, map[string]string{"test-app": "test-app-4"}, t)
if ev := fx.Wait("service"); ev == nil {
t.Fatal("Timeout creating service")
}
expectedSvcList := []*model.Service{svc1, svc2, svc3, svc4}
eventually(t, func() bool {
svcList := controller.Services()
return servicesEqual(svcList, expectedSvcList)
})
// restrict namespaces to nsA (expect 2 delete events for svc3 and svc4)
updateMeshConfig(
&meshconfig.MeshConfig{
DiscoverySelectors: []*metaV1.LabelSelector{
{
MatchLabels: map[string]string{
"app": "foo",
},
},
},
},
[]*model.Service{svc1, svc2},
2,
meshWatcher,
fx,
controller,
)
// restrict namespaces to nsB (1 create event should trigger for nsB service and 2 delete events for nsA services)
updateMeshConfig(
&meshconfig.MeshConfig{
DiscoverySelectors: []*metaV1.LabelSelector{
{
MatchLabels: map[string]string{
"app": "bar",
},
},
},
},
[]*model.Service{svc3},
3,
meshWatcher,
fx,
controller,
)
// expand namespaces to nsA and nsB with selectors (2 create events should trigger for nsA services)
updateMeshConfig(
&meshconfig.MeshConfig{
DiscoverySelectors: []*metaV1.LabelSelector{
{
MatchExpressions: []metaV1.LabelSelectorRequirement{
{
Key: "app",
Operator: metaV1.LabelSelectorOpIn,
Values: []string{"foo", "bar"},
},
},
},
},
},
[]*model.Service{svc1, svc2, svc3},
2,
meshWatcher,
fx,
controller,
)
// permit all discovery namespaces by omitting discovery selectors (1 create event should trigger for the nsC service)
updateMeshConfig(
&meshconfig.MeshConfig{
DiscoverySelectors: []*metaV1.LabelSelector{},
},
[]*model.Service{svc1, svc2, svc3, svc4},
1,
meshWatcher,
fx,
controller,
)
})
}
}
func TestInstancesByPort_WorkloadInstances(t *testing.T) {
ctl, fx := NewFakeControllerWithOptions(FakeControllerOptions{})
go ctl.Run(ctl.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(ctl.stop, ctl.HasSynced)
defer ctl.Stop()
createServiceWithTargetPorts(ctl, "ratings", "bookinfo-ratings",
map[string]string{
annotation.AlphaKubernetesServiceAccounts.Name: "ratings",
annotation.AlphaCanonicalServiceAccounts.Name: "ratings@gserviceaccount2.com",
},
[]coreV1.ServicePort{
{
Name: "http-port",
Port: 8080,
Protocol: "TCP",
TargetPort: intstr.IntOrString{Type: intstr.String, StrVal: "http"},
},
},
map[string]string{"app": "ratings"}, t)
fx.WaitOrFail(t, "service")
wiRatings1 := &model.WorkloadInstance{
Name: "ratings-1",
Namespace: "bookinfo-ratings",
Endpoint: &model.IstioEndpoint{
Labels: labels.Instance{"app": "ratings"},
Address: "2.2.2.2",
EndpointPort: 8081, // should be ignored since it doesn't define PortMap
},
}
wiRatings2 := &model.WorkloadInstance{
Name: "ratings-2",
Namespace: "bookinfo-ratings",
Endpoint: &model.IstioEndpoint{
Labels: labels.Instance{"app": "ratings"},
Address: "2.2.2.2",
},
PortMap: map[string]uint32{
"http": 8082, // should be used
},
}
wiRatings3 := &model.WorkloadInstance{
Name: "ratings-3",
Namespace: "bookinfo-ratings",
Endpoint: &model.IstioEndpoint{
Labels: labels.Instance{"app": "ratings"},
Address: "2.2.2.2",
},
PortMap: map[string]uint32{
"http": 8083, // should be used
},
}
for _, wi := range []*model.WorkloadInstance{wiRatings1, wiRatings2, wiRatings3} {
ctl.WorkloadInstanceHandler(wi, model.EventAdd) // simulate adding a workload entry
}
// get service object
svcs := ctl.Services()
if len(svcs) != 1 {
t.Fatalf("failed to get services (%v)", svcs)
}
// get service instances
instances := ctl.InstancesByPort(svcs[0], 8080, nil)
want := []string{"2.2.2.2:8082", "2.2.2.2:8083"} // expect both WorkloadEntries even though they have the same IP
var got []string
for _, instance := range instances {
got = append(got, net.JoinHostPort(instance.Endpoint.Address, strconv.Itoa(int(instance.Endpoint.EndpointPort))))
}
sort.Strings(got)
if diff := cmp.Diff(want, got); diff != "" {
t.Fatalf("InstancesByPort() returned unexpected list of endpoints (--want/++got): %v", diff)
}
}
func TestExternalNameServiceInstances(t *testing.T) {
for mode, name := range EndpointModeNames {
mode := mode
t.Run(name, func(t *testing.T) {
controller, fx := NewFakeControllerWithOptions(FakeControllerOptions{Mode: mode})
go controller.Run(controller.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(controller.stop, controller.HasSynced)
defer controller.Stop()
createExternalNameService(controller, "svc5", "nsA",
[]int32{1, 2, 3}, "foo.co", t, fx.Events)
converted := controller.Services()
if len(converted) != 1 {
t.Fatalf("failed to get services (%v)s", converted)
}
instances := controller.InstancesByPort(converted[0], 1, nil)
if len(instances) != 1 {
t.Fatalf("expected 1 instance, got %v", instances)
}
if instances[0].ServicePort.Port != 1 {
t.Fatalf("expected port 1, got %v", instances[0].ServicePort.Port)
}
})
}
}
func TestController_ExternalNameService(t *testing.T) {
for mode, name := range EndpointModeNames {
mode := mode
t.Run(name, func(t *testing.T) {
deleteWg := sync.WaitGroup{}
controller, fx := NewFakeControllerWithOptions(FakeControllerOptions{
Mode: mode,
ServiceHandler: func(_ *model.Service, e model.Event) {
if e == model.EventDelete {
deleteWg.Done()
}
},
})
go controller.Run(controller.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(controller.stop, controller.HasSynced)
defer controller.Stop()
// Use a timeout to keep the test from hanging.
k8sSvcs := []*coreV1.Service{
createExternalNameService(controller, "svc1", "nsA",
[]int32{8080}, "test-app-1.test.svc."+defaultFakeDomainSuffix, t, fx.Events),
createExternalNameService(controller, "svc2", "nsA",
[]int32{8081}, "test-app-2.test.svc."+defaultFakeDomainSuffix, t, fx.Events),
createExternalNameService(controller, "svc3", "nsA",
[]int32{8082}, "test-app-3.test.pod."+defaultFakeDomainSuffix, t, fx.Events),
createExternalNameService(controller, "svc4", "nsA",
[]int32{8083}, "g.co", t, fx.Events),
}
expectedSvcList := []*model.Service{
{
Hostname: kube.ServiceHostname("svc1", "nsA", defaultFakeDomainSuffix),
Ports: model.PortList{
&model.Port{
Name: "tcp-port",
Port: 8080,
Protocol: protocol.TCP,
},
},
MeshExternal: true,
Resolution: model.DNSLB,
},
{
Hostname: kube.ServiceHostname("svc2", "nsA", defaultFakeDomainSuffix),
Ports: model.PortList{
&model.Port{
Name: "tcp-port",
Port: 8081,
Protocol: protocol.TCP,
},
},
MeshExternal: true,
Resolution: model.DNSLB,
},
{
Hostname: kube.ServiceHostname("svc3", "nsA", defaultFakeDomainSuffix),
Ports: model.PortList{
&model.Port{
Name: "tcp-port",
Port: 8082,
Protocol: protocol.TCP,
},
},
MeshExternal: true,
Resolution: model.DNSLB,
},
{
Hostname: kube.ServiceHostname("svc4", "nsA", defaultFakeDomainSuffix),
Ports: model.PortList{
&model.Port{
Name: "tcp-port",
Port: 8083,
Protocol: protocol.TCP,
},
},
MeshExternal: true,
Resolution: model.DNSLB,
},
}
svcList := controller.Services()
if len(svcList) != len(expectedSvcList) {
t.Fatalf("Expecting %d service but got %d\r\n", len(expectedSvcList), len(svcList))
}
for i, exp := range expectedSvcList {
if exp.Hostname != svcList[i].Hostname {
t.Fatalf("got hostname of %dst service, got:\n%#v\nwanted:\n%#v\n", i+1, svcList[i].Hostname, exp.Hostname)
}
if !reflect.DeepEqual(exp.Ports, svcList[i].Ports) {
t.Fatalf("got ports of %dst service, got:\n%#v\nwanted:\n%#v\n", i+1, svcList[i].Ports, exp.Ports)
}
if svcList[i].MeshExternal != exp.MeshExternal {
t.Fatalf("i=%v, MeshExternal==%v, should be %v: externalName='%s'", i+1, exp.MeshExternal, svcList[i].MeshExternal, k8sSvcs[i].Spec.ExternalName)
}
if svcList[i].Resolution != exp.Resolution {
t.Fatalf("i=%v, Resolution=='%v', should be '%v'", i+1, svcList[i].Resolution, exp.Resolution)
}
instances := controller.InstancesByPort(svcList[i], svcList[i].Ports[0].Port, nil)
if len(instances) != 1 {
t.Fatalf("should be exactly 1 instance: len(instances) = %v", len(instances))
}
if instances[0].Endpoint.Address != k8sSvcs[i].Spec.ExternalName {
t.Fatalf("wrong instance endpoint address: '%s' != '%s'", instances[0].Endpoint.Address, k8sSvcs[i].Spec.ExternalName)
}
}
deleteWg.Add(len(k8sSvcs))
for _, s := range k8sSvcs {
deleteExternalNameService(controller, s.Name, s.Namespace, t, fx.Events)
}
deleteWg.Wait()
svcList = controller.Services()
if len(svcList) != 0 {
t.Fatalf("Should have 0 services at this point")
}
for _, exp := range expectedSvcList {
instances := controller.InstancesByPort(exp, exp.Ports[0].Port, nil)
if len(instances) != 0 {
t.Fatalf("should be exactly 0 instance: len(instances) = %v", len(instances))
}
}
})
}
}
func createEndpoints(t *testing.T, controller *FakeController, name, namespace string,
portNames, ips []string, refs []*coreV1.ObjectReference, labels map[string]string) {
if labels == nil {
labels = make(map[string]string)
}
// Add the reference to the service. Used by EndpointSlice logic only.
labels[discovery.LabelServiceName] = name
if refs == nil {
refs = make([]*coreV1.ObjectReference, len(ips))
}
var portNum int32 = 1001
eas := make([]coreV1.EndpointAddress, 0)
for i, ip := range ips {
eas = append(eas, coreV1.EndpointAddress{IP: ip, TargetRef: refs[i]})
}
eps := make([]coreV1.EndpointPort, 0)
for _, name := range portNames {
eps = append(eps, coreV1.EndpointPort{Name: name, Port: portNum})
}
endpoint := &coreV1.Endpoints{
ObjectMeta: metaV1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: labels,
},
Subsets: []coreV1.EndpointSubset{{
Addresses: eas,
Ports: eps,
}},
}
if _, err := controller.client.CoreV1().Endpoints(namespace).Create(context.TODO(), endpoint, metaV1.CreateOptions{}); err != nil {
if errors.IsAlreadyExists(err) {
_, err = controller.client.CoreV1().Endpoints(namespace).Update(context.TODO(), endpoint, metaV1.UpdateOptions{})
}
if err != nil {
t.Fatalf("failed to create endpoints %s in namespace %s (error %v)", name, namespace, err)
}
}
// Create endpoint slice as well
esps := make([]discovery.EndpointPort, 0)
for _, name := range portNames {
n := name // Create a stable reference to take the pointer from
esps = append(esps, discovery.EndpointPort{Name: &n, Port: &portNum})
}
var sliceEndpoint []discovery.Endpoint
for i, ip := range ips {
sliceEndpoint = append(sliceEndpoint, discovery.Endpoint{
Addresses: []string{ip},
TargetRef: refs[i],
})
}
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metaV1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: labels,
},
Endpoints: sliceEndpoint,
Ports: esps,
}
if _, err := controller.client.DiscoveryV1().EndpointSlices(namespace).Create(context.TODO(), endpointSlice, metaV1.CreateOptions{}); err != nil {
if errors.IsAlreadyExists(err) {
_, err = controller.client.DiscoveryV1().EndpointSlices(namespace).Update(context.TODO(), endpointSlice, metaV1.UpdateOptions{})
}
if err != nil {
t.Fatalf("failed to create endpoint slice %s in namespace %s (error %v)", name, namespace, err)
}
}
}
func updateEndpoints(controller *FakeController, name, namespace string, portNames, ips []string, t *testing.T) {
var portNum int32 = 1001
eas := make([]coreV1.EndpointAddress, 0)
for _, ip := range ips {
eas = append(eas, coreV1.EndpointAddress{IP: ip})
}
eps := make([]coreV1.EndpointPort, 0)
for _, name := range portNames {
eps = append(eps, coreV1.EndpointPort{Name: name, Port: portNum})
}
endpoint := &coreV1.Endpoints{
ObjectMeta: metaV1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Subsets: []coreV1.EndpointSubset{{
Addresses: eas,
Ports: eps,
}},
}
if _, err := controller.client.CoreV1().Endpoints(namespace).Update(context.TODO(), endpoint, metaV1.UpdateOptions{}); err != nil {
t.Fatalf("failed to update endpoints %s in namespace %s (error %v)", name, namespace, err)
}
// Update endpoint slice as well
esps := make([]discovery.EndpointPort, 0)
for i := range portNames {
esps = append(esps, discovery.EndpointPort{Name: &portNames[i], Port: &portNum})
}
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metaV1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
discovery.LabelServiceName: name,
},
},
Endpoints: []discovery.Endpoint{
{
Addresses: ips,
},
},
Ports: esps,
}
if _, err := controller.client.DiscoveryV1().EndpointSlices(namespace).Update(context.TODO(), endpointSlice, metaV1.UpdateOptions{}); err != nil {
t.Errorf("failed to create endpoint slice %s in namespace %s (error %v)", name, namespace, err)
}
}
func createServiceWithTargetPorts(controller *FakeController, name, namespace string, annotations map[string]string,
svcPorts []coreV1.ServicePort, selector map[string]string, t *testing.T) {
service := &coreV1.Service{
ObjectMeta: metaV1.ObjectMeta{
Name: name,
Namespace: namespace,
Annotations: annotations,
},
Spec: coreV1.ServiceSpec{
ClusterIP: "10.0.0.1", // FIXME: generate?
Ports: svcPorts,
Selector: selector,
Type: coreV1.ServiceTypeClusterIP,
},
}
_, err := controller.client.CoreV1().Services(namespace).Create(context.TODO(), service, metaV1.CreateOptions{})
if err != nil {
t.Fatalf("Cannot create service %s in namespace %s (error: %v)", name, namespace, err)
}
}
func createService(controller *FakeController, name, namespace string, annotations map[string]string,
ports []int32, selector map[string]string, t *testing.T) {
svcPorts := make([]coreV1.ServicePort, 0)
for _, p := range ports {
svcPorts = append(svcPorts, coreV1.ServicePort{
Name: "tcp-port",
Port: p,
Protocol: "http",
})
}
service := &coreV1.Service{
ObjectMeta: metaV1.ObjectMeta{
Name: name,
Namespace: namespace,
Annotations: annotations,
},
Spec: coreV1.ServiceSpec{
ClusterIP: "10.0.0.1", // FIXME: generate?
Ports: svcPorts,
Selector: selector,
Type: coreV1.ServiceTypeClusterIP,
},
}
_, err := controller.client.CoreV1().Services(namespace).Create(context.TODO(), service, metaV1.CreateOptions{})
if err != nil {
t.Fatalf("Cannot create service %s in namespace %s (error: %v)", name, namespace, err)
}
}
func getService(controller *FakeController, name, namespace string, t *testing.T) *coreV1.Service {
svc, err := controller.client.CoreV1().Services(namespace).Get(context.TODO(), name, metaV1.GetOptions{})
if err != nil {
t.Fatalf("Cannot get service %s in namespace %s (error: %v)", name, namespace, err)
}
return svc
}
func updateService(controller *FakeController, svc *coreV1.Service, t *testing.T) *coreV1.Service {
svc, err := controller.client.CoreV1().Services(svc.Namespace).Update(context.TODO(), svc, metaV1.UpdateOptions{})
if err != nil {
t.Fatalf("Cannot update service %s in namespace %s (error: %v)", svc.Name, svc.Namespace, err)
}
return svc
}
func createServiceWithoutClusterIP(controller *FakeController, name, namespace string, annotations map[string]string,
ports []int32, selector map[string]string, t *testing.T) {
svcPorts := make([]coreV1.ServicePort, 0)
for _, p := range ports {
svcPorts = append(svcPorts, coreV1.ServicePort{
Name: "tcp-port",
Port: p,
Protocol: "http",
})
}
service := &coreV1.Service{
ObjectMeta: metaV1.ObjectMeta{
Name: name,
Namespace: namespace,
Annotations: annotations,
},
Spec: coreV1.ServiceSpec{
ClusterIP: coreV1.ClusterIPNone,
Ports: svcPorts,
Selector: selector,
Type: coreV1.ServiceTypeClusterIP,
},
}
_, err := controller.client.CoreV1().Services(namespace).Create(context.TODO(), service, metaV1.CreateOptions{})
if err != nil {
t.Fatalf("Cannot create service %s in namespace %s (error: %v)", name, namespace, err)
}
}
// nolint: unparam
func createExternalNameService(controller *FakeController, name, namespace string,
ports []int32, externalName string, t *testing.T, xdsEvents <-chan FakeXdsEvent) *coreV1.Service {
defer func() {
<-xdsEvents
}()
svcPorts := make([]coreV1.ServicePort, 0)
for _, p := range ports {
svcPorts = append(svcPorts, coreV1.ServicePort{
Name: "tcp-port",
Port: p,
Protocol: "http",
})
}
service := &coreV1.Service{
ObjectMeta: metaV1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: coreV1.ServiceSpec{
Ports: svcPorts,
Type: coreV1.ServiceTypeExternalName,
ExternalName: externalName,
},
}
_, err := controller.client.CoreV1().Services(namespace).Create(context.TODO(), service, metaV1.CreateOptions{})
if err != nil {
t.Fatalf("Cannot create service %s in namespace %s (error: %v)", name, namespace, err)
}
return service
}
func deleteExternalNameService(controller *FakeController, name, namespace string, t *testing.T, xdsEvents <-chan FakeXdsEvent) {
defer func() {
<-xdsEvents
}()
err := controller.client.CoreV1().Services(namespace).Delete(context.TODO(), name, metaV1.DeleteOptions{})
if err != nil {
t.Fatalf("Cannot delete service %s in namespace %s (error: %v)", name, namespace, err)
}
}
func servicesEqual(svcList, expectedSvcList []*model.Service) bool {
if len(svcList) != len(expectedSvcList) {
return false
}
for i, exp := range expectedSvcList {
if exp.Hostname != svcList[i].Hostname {
return false
}
if exp.DefaultAddress != svcList[i].DefaultAddress {
return false
}
if !reflect.DeepEqual(exp.Ports, svcList[i].Ports) {
return false
}
}
return true
}
func addPods(t *testing.T, controller *FakeController, fx *FakeXdsUpdater, pods ...*coreV1.Pod) {
for _, pod := range pods {
p, _ := controller.client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metaV1.GetOptions{})
var newPod *coreV1.Pod
var err error
if p == nil {
newPod, err = controller.client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metaV1.CreateOptions{})
if err != nil {
t.Fatalf("Cannot create %s in namespace %s (error: %v)", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace, err)
}
} else {
newPod, err = controller.client.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metaV1.UpdateOptions{})
if err != nil {
t.Fatalf("Cannot update %s in namespace %s (error: %v)", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace, err)
}
}
setPodReady(newPod)
// Apiserver doesn't allow Create/Update to modify the pod status. Creating doesn't result in
// events - since PodIP will be "".
newPod.Status.PodIP = pod.Status.PodIP
newPod.Status.Phase = coreV1.PodRunning
_, _ = controller.client.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), newPod, metaV1.UpdateOptions{})
if err := waitForPod(controller, pod.Status.PodIP); err != nil {
t.Fatal(err)
}
// pod first time occur will trigger proxy push
fx.Wait("proxy")
}
}
func setPodReady(pod *coreV1.Pod) {
pod.Status.Conditions = []coreV1.PodCondition{
{
Type: coreV1.PodReady,
Status: coreV1.ConditionTrue,
LastTransitionTime: metaV1.Now(),
},
}
}
func generatePod(ip, name, namespace, saName, node string, labels map[string]string, annotations map[string]string) *coreV1.Pod {
automount := false
return &coreV1.Pod{
ObjectMeta: metaV1.ObjectMeta{
Name: name,
Labels: labels,
Annotations: annotations,
Namespace: namespace,
},
Spec: coreV1.PodSpec{
ServiceAccountName: saName,
NodeName: node,
AutomountServiceAccountToken: &automount,
// Validation requires this
Containers: []coreV1.Container{
{
Name: "test",
Image: "ununtu",
},
},
},
// The cache controller uses this as key, required by our impl.
Status: coreV1.PodStatus{
Conditions: []coreV1.PodCondition{
{
Type: coreV1.PodReady,
Status: coreV1.ConditionTrue,
LastTransitionTime: metaV1.Now(),
},
},
PodIP: ip,
HostIP: ip,
Phase: coreV1.PodRunning,
},
}
}
func generateNode(name string, labels map[string]string) *coreV1.Node {
return &coreV1.Node{
TypeMeta: metaV1.TypeMeta{
Kind: "Node",
APIVersion: "v1",
},
ObjectMeta: metaV1.ObjectMeta{
Name: name,
Labels: labels,
},
}
}
func addNodes(t *testing.T, controller *FakeController, nodes ...*coreV1.Node) {
fakeClient := controller.client
for _, node := range nodes {
_, err := fakeClient.CoreV1().Nodes().Create(context.TODO(), node, metaV1.CreateOptions{})
if errors.IsAlreadyExists(err) {
if _, err := fakeClient.CoreV1().Nodes().Update(context.TODO(), node, metaV1.UpdateOptions{}); err != nil {
t.Fatal(err)
}
} else if err != nil {
t.Fatal(err)
}
if err := waitForNode(controller, node.Name); err != nil {
t.Fatal(err)
}
}
}
func TestEndpointUpdate(t *testing.T) {
for mode, name := range EndpointModeNames {
mode := mode
t.Run(name, func(t *testing.T) {
controller, fx := NewFakeControllerWithOptions(FakeControllerOptions{Mode: mode})
go controller.Run(controller.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(controller.stop, controller.HasSynced)
defer controller.Stop()
pod1 := generatePod("128.0.0.1", "pod1", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
pods := []*coreV1.Pod{pod1}
addPods(t, controller, fx, pods...)
// 1. incremental eds for normal service endpoint update
createService(controller, "svc1", "nsa", nil,
[]int32{8080}, map[string]string{"app": "prod-app"}, t)
if ev := fx.Wait("service"); ev == nil {
t.Fatal("Timeout creating service")
}
// Endpoints are generated by Kubernetes from pod labels and service selectors.
// Here we manually create them for mocking purpose.
svc1Ips := []string{"128.0.0.1"}
portNames := []string{"tcp-port"}
// Create 1 endpoint that refers to a pod in the same namespace.
createEndpoints(t, controller, "svc1", "nsa", portNames, svc1Ips, nil, nil)
if ev := fx.Wait("eds"); ev == nil {
t.Fatalf("Timeout incremental eds")
}
// delete normal service
err := controller.client.CoreV1().Services("nsa").Delete(context.TODO(), "svc1", metaV1.DeleteOptions{})
if err != nil {
t.Fatalf("Cannot delete service (error: %v)", err)
}
if ev := fx.Wait("service"); ev == nil {
t.Fatalf("Timeout deleting service")
}
// 2. full xds push request for headless service endpoint update
// create a headless service
createServiceWithoutClusterIP(controller, "svc1", "nsa", nil,
[]int32{8080}, map[string]string{"app": "prod-app"}, t)
if ev := fx.Wait("service"); ev == nil {
t.Fatalf("Timeout creating service")
}
// Create 1 endpoint that refers to a pod in the same namespace.
svc1Ips = append(svc1Ips, "128.0.0.2")
updateEndpoints(controller, "svc1", "nsa", portNames, svc1Ips, t)
ev := fx.Wait("xds")
if ev == nil {
t.Fatalf("Timeout xds push")
}
if ev.ID != string(kube.ServiceHostname("svc1", "nsa", controller.opts.DomainSuffix)) {
t.Errorf("Expect service %s updated, but got %s",
kube.ServiceHostname("svc1", "nsa", controller.opts.DomainSuffix), ev.ID)
}
})
}
}
// Validates that when Pilot sees Endpoint before the corresponding Pod, it triggers endpoint event on pod event.
func TestEndpointUpdateBeforePodUpdate(t *testing.T) {
for mode, name := range EndpointModeNames {
mode := mode
t.Run(name, func(t *testing.T) {
controller, fx := NewFakeControllerWithOptions(FakeControllerOptions{Mode: mode})
go controller.Run(controller.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(controller.stop, controller.HasSynced)
// Setup kube caches
defer controller.Stop()
addNodes(t, controller, generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"}))
// Setup help functions to make the test more explicit
addPod := func(name, ip string) {
pod := generatePod(ip, name, "nsA", name, "node1", map[string]string{"app": "prod-app"}, map[string]string{})
addPods(t, controller, fx, pod)
}
deletePod := func(name, ip string) {
if err := controller.client.CoreV1().Pods("nsA").Delete(context.TODO(), name, metaV1.DeleteOptions{}); err != nil {
t.Fatal(err)
}
retry.UntilSuccessOrFail(t, func() error {
controller.pods.RLock()
defer controller.pods.RUnlock()
if _, ok := controller.pods.podsByIP[ip]; ok {
return fmt.Errorf("pod still present")
}
return nil
}, retry.Timeout(time.Second))
}
addService := func(name string) {
// create service
createService(controller, name, "nsA", nil,
[]int32{8080}, map[string]string{"app": "prod-app"}, t)
if ev := fx.Wait("service"); ev == nil {
t.Fatal("Timeout creating service")
}
}
addEndpoint := func(svcName string, ips []string, pods []string) {
var refs []*coreV1.ObjectReference
for _, pod := range pods {
if pod == "" {
refs = append(refs, nil)
} else {
refs = append(refs, &coreV1.ObjectReference{
Kind: "Pod",
Namespace: "nsA",
Name: pod,
})
}
}
createEndpoints(t, controller, svcName, "nsA", []string{"tcp-port"}, ips, refs, nil)
}
assertEndpointsEvent := func(ips []string, pods []string) {
t.Helper()
ev := fx.Wait("eds")
if ev == nil {
t.Fatalf("Timeout incremental eds")
}
var gotIps []string
for _, e := range ev.Endpoints {
gotIps = append(gotIps, e.Address)
}
var gotSA []string
var expectedSa []string
for _, e := range pods {
if e == "" {
expectedSa = append(expectedSa, "")
} else {
expectedSa = append(expectedSa, "spiffe://cluster.local/ns/nsA/sa/"+e)
}
}
for _, e := range ev.Endpoints {
gotSA = append(gotSA, e.ServiceAccount)
}
if !reflect.DeepEqual(gotIps, ips) {
t.Fatalf("expected ips %v, got %v", ips, gotIps)
}
if !reflect.DeepEqual(gotSA, expectedSa) {
t.Fatalf("expected SAs %v, got %v", expectedSa, gotSA)
}
}
assertPendingResync := func(expected int) {
t.Helper()
retry.UntilSuccessOrFail(t, func() error {
controller.pods.RLock()
defer controller.pods.RUnlock()
if len(controller.pods.needResync) != expected {
return fmt.Errorf("expected %d pods needing resync, got %d", expected, len(controller.pods.needResync))
}
return nil
}, retry.Timeout(time.Second))
}
// standard ordering
addService("svc")
addPod("pod1", "172.0.1.1")
addEndpoint("svc", []string{"172.0.1.1"}, []string{"pod1"})
assertEndpointsEvent([]string{"172.0.1.1"}, []string{"pod1"})
fx.Clear()
// Create the endpoint, then later add the pod. Should eventually get an update for the endpoint
addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
assertEndpointsEvent([]string{"172.0.1.1"}, []string{"pod1"})
fx.Clear()
addPod("pod2", "172.0.1.2")
assertEndpointsEvent([]string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
fx.Clear()
// Create the endpoint without a pod reference. We should see it immediately
addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2", "172.0.1.3"}, []string{"pod1", "pod2", ""})
assertEndpointsEvent([]string{"172.0.1.1", "172.0.1.2", "172.0.1.3"}, []string{"pod1", "pod2", ""})
fx.Clear()
// Delete a pod before the endpoint
addEndpoint("svc", []string{"172.0.1.1"}, []string{"pod1"})
deletePod("pod2", "172.0.1.2")
assertEndpointsEvent([]string{"172.0.1.1"}, []string{"pod1"})
fx.Clear()
// add another service
addService("other")
// Add endpoints for the new service, and the old one. Both should be missing the last IP
addEndpoint("other", []string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
assertEndpointsEvent([]string{"172.0.1.1"}, []string{"pod1"})
assertEndpointsEvent([]string{"172.0.1.1"}, []string{"pod1"})
fx.Clear()
// Add the pod, expect the endpoints update for both
addPod("pod2", "172.0.1.2")
assertEndpointsEvent([]string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
assertEndpointsEvent([]string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
// Check for memory leaks
assertPendingResync(0)
addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2", "172.0.1.3"}, []string{"pod1", "pod2", "pod3"})
// This is really an implementation detail here - but checking to sanity check our test
assertPendingResync(1)
// Remove the endpoint again, with no pod events in between. Should have no memory leaks
addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
// TODO this case would leak
// assertPendingResync(0)
// completely remove the endpoint
addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2", "172.0.1.3"}, []string{"pod1", "pod2", "pod3"})
assertPendingResync(1)
if err := controller.client.CoreV1().Endpoints("nsA").Delete(context.TODO(), "svc", metaV1.DeleteOptions{}); err != nil {
t.Fatal(err)
}
if err := controller.client.DiscoveryV1().EndpointSlices("nsA").Delete(context.TODO(), "svc", metaV1.DeleteOptions{}); err != nil {
t.Fatal(err)
}
assertPendingResync(0)
})
}
}
func TestWorkloadInstanceHandlerMultipleEndpoints(t *testing.T) {
controller, fx := NewFakeControllerWithOptions(FakeControllerOptions{})
go controller.Run(controller.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(controller.stop, controller.HasSynced)
defer controller.Stop()
// Create an initial pod with a service, and endpoint.
pod1 := generatePod("172.0.1.1", "pod1", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
pod2 := generatePod("172.0.1.2", "pod2", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
pods := []*coreV1.Pod{pod1, pod2}
nodes := []*coreV1.Node{
generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"}),
}
addNodes(t, controller, nodes...)
addPods(t, controller, fx, pods...)
createService(controller, "svc1", "nsA", nil,
[]int32{8080}, map[string]string{"app": "prod-app"}, t)
if ev := fx.Wait("service"); ev == nil {
t.Fatal("Timeout creating service")
}
pod1Ips := []string{"172.0.1.1"}
portNames := []string{"tcp-port"}
createEndpoints(t, controller, "svc1", "nsA", portNames, pod1Ips, nil, nil)
if ev := fx.Wait("eds"); ev == nil {
t.Fatal("Timeout incremental eds")
}
// Simulate adding a workload entry (fired through invocation of WorkloadInstanceHandler)
controller.WorkloadInstanceHandler(&model.WorkloadInstance{
Namespace: "nsA",
Endpoint: &model.IstioEndpoint{
Labels: labels.Instance{"app": "prod-app"},
ServiceAccount: "account",
Address: "2.2.2.2",
EndpointPort: 8080,
},
}, model.EventAdd)
expectedEndpointIPs := []string{"172.0.1.1", "2.2.2.2"}
// Check if an EDS event is fired
if ev := fx.Wait("eds"); ev == nil {
t.Fatal("Did not get eds event when workload entry was added")
} else {
// check if the hostname matches that of k8s service svc1.nsA
if ev.ID != "svc1.nsA.svc.company.com" {
t.Fatalf("eds event for workload entry addition did not match the expected service. got %s, want %s",
ev.ID, "svc1.nsA.svc.company.com")
}
// we should have the pod IP and the workload Entry's IP in the endpoints..
// the first endpoint should be that of the k8s pod and the second one should be the workload entry
var gotEndpointIPs []string
for _, ep := range ev.Endpoints {
gotEndpointIPs = append(gotEndpointIPs, ep.Address)
}
if !reflect.DeepEqual(gotEndpointIPs, expectedEndpointIPs) {
t.Fatalf("eds update after adding workload entry did not match expected list. got %v, want %v",
gotEndpointIPs, expectedEndpointIPs)
}
}
// Check if InstancesByPort returns the same list
converted := controller.Services()
if len(converted) != 1 {
t.Fatalf("failed to get services (%v), converted", converted)
}
instances := controller.InstancesByPort(converted[0], 8080, labels.Instance{
"app": "prod-app",
})
var gotEndpointIPs []string
for _, instance := range instances {
gotEndpointIPs = append(gotEndpointIPs, instance.Endpoint.Address)
}
if !reflect.DeepEqual(gotEndpointIPs, expectedEndpointIPs) {
t.Fatalf("InstancesByPort after adding workload entry did not match expected list. got %v, want %v",
gotEndpointIPs, expectedEndpointIPs)
}
// Now add a k8s pod to the service and ensure that eds updates contain both pod IPs and workload entry IPs.
updateEndpoints(controller, "svc1", "nsA", portNames, []string{"172.0.1.1", "172.0.1.2"}, t)
if ev := fx.Wait("eds"); ev == nil {
t.Fatal("Timeout incremental eds")
} else {
var gotEndpointIPs []string
for _, ep := range ev.Endpoints {
gotEndpointIPs = append(gotEndpointIPs, ep.Address)
}
expectedEndpointIPs = []string{"172.0.1.1", "172.0.1.2", "2.2.2.2"}
if !reflect.DeepEqual(gotEndpointIPs, expectedEndpointIPs) {
t.Fatalf("eds update after adding pod did not match expected list. got %v, want %v",
gotEndpointIPs, expectedEndpointIPs)
}
}
}
func TestWorkloadInstanceHandler_WorkloadInstanceIndex(t *testing.T) {
ctl, _ := NewFakeControllerWithOptions(FakeControllerOptions{})
go ctl.Run(ctl.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(ctl.stop, ctl.HasSynced)
defer ctl.Stop()
verifyGetByIP := func(address string, want []*model.WorkloadInstance) {
got := ctl.workloadInstancesIndex.GetByIP(address)
if diff := cmp.Diff(want, got); diff != "" {
t.Fatalf("workload index is not valid (--want/++got): %v", diff)
}
}
wi1 := &model.WorkloadInstance{
Name: "ratings-1",
Namespace: "bookinfo",
Endpoint: &model.IstioEndpoint{
Labels: labels.Instance{"app": "ratings"},
Address: "2.2.2.2",
EndpointPort: 8080,
},
}
// simulate adding a workload entry
ctl.WorkloadInstanceHandler(wi1, model.EventAdd)
verifyGetByIP("2.2.2.2", []*model.WorkloadInstance{wi1})
wi2 := &model.WorkloadInstance{
Name: "details-1",
Namespace: "bookinfo",
Endpoint: &model.IstioEndpoint{
Labels: labels.Instance{"app": "details"},
Address: "3.3.3.3",
EndpointPort: 9090,
},
}
// simulate adding a workload entry
ctl.WorkloadInstanceHandler(wi2, model.EventAdd)
verifyGetByIP("2.2.2.2", []*model.WorkloadInstance{wi1})
verifyGetByIP("3.3.3.3", []*model.WorkloadInstance{wi2})
wi3 := &model.WorkloadInstance{
Name: "details-1",
Namespace: "bookinfo",
Endpoint: &model.IstioEndpoint{
Labels: labels.Instance{"app": "details"},
Address: "2.2.2.2", // update IP
EndpointPort: 9090,
},
}
// simulate updating a workload entry
ctl.WorkloadInstanceHandler(wi3, model.EventUpdate)
verifyGetByIP("3.3.3.3", nil)
verifyGetByIP("2.2.2.2", []*model.WorkloadInstance{wi3, wi1})
// simulate deleting a workload entry
ctl.WorkloadInstanceHandler(wi3, model.EventDelete)
verifyGetByIP("2.2.2.2", []*model.WorkloadInstance{wi1})
// simulate deleting a workload entry
ctl.WorkloadInstanceHandler(wi1, model.EventDelete)
verifyGetByIP("2.2.2.2", nil)
}
func TestKubeEndpointsControllerOnEvent(t *testing.T) {
testCases := []struct {
mode EndpointMode
tombstone cache.DeletedFinalStateUnknown
}{
{
mode: EndpointsOnly,
tombstone: cache.DeletedFinalStateUnknown{
Key: "namespace/name",
Obj: &coreV1.Endpoints{},
},
},
{
mode: EndpointSliceOnly,
tombstone: cache.DeletedFinalStateUnknown{
Key: "namespace/name",
Obj: &discovery.EndpointSlice{},
},
},
}
for _, tc := range testCases {
t.Run(EndpointModeNames[tc.mode], func(t *testing.T) {
controller, _ := NewFakeControllerWithOptions(FakeControllerOptions{Mode: tc.mode})
go controller.Run(controller.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(controller.stop, controller.HasSynced)
defer controller.Stop()
if err := controller.endpoints.onEvent(tc.tombstone, model.EventDelete); err != nil {
t.Errorf("unexpected error: %v", err)
}
})
}
}
func TestUpdateEdsCacheOnServiceUpdate(t *testing.T) {
controller, fx := NewFakeControllerWithOptions(FakeControllerOptions{})
go controller.Run(controller.stop)
// Wait for the caches to sync, otherwise we may hit race conditions where events are dropped
cache.WaitForCacheSync(controller.stop, controller.HasSynced)
defer controller.Stop()
// Create an initial pod with a service, and endpoint.
pod1 := generatePod("172.0.1.1", "pod1", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
pod2 := generatePod("172.0.1.2", "pod2", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
pods := []*coreV1.Pod{pod1, pod2}
nodes := []*coreV1.Node{
generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"}),
}
addNodes(t, controller, nodes...)
addPods(t, controller, fx, pods...)
createService(controller, "svc1", "nsA", nil,
[]int32{8080}, map[string]string{"app": "prod-app"}, t)
if ev := fx.Wait("service"); ev == nil {
t.Fatal("Timeout creating service")
}
pod1Ips := []string{"172.0.1.1"}
portNames := []string{"tcp-port"}
createEndpoints(t, controller, "svc1", "nsA", portNames, pod1Ips, nil, nil)
if ev := fx.Wait("eds"); ev == nil {
t.Fatal("Timeout incremental eds")
}
// update service selector
svc := getService(controller, "svc1", "nsA", t)
svc.Spec.Selector = map[string]string{
"app": "prod-app",
"foo": "bar",
}
svc.Spec.Selector = map[string]string{
"app": "prod-app",
}
updateService(controller, svc, t)
if ev := fx.Wait("eds cache"); ev == nil {
t.Fatal("Timeout updating eds cache")
}
}
func clearDiscoverabilityPolicy(ep *model.IstioEndpoint) {
if ep != nil {
ep.DiscoverabilityPolicy = nil
}
}