blob: f88797ce3f7611bc4a5f78ebe6ee27717fb0e7d0 [file] [log] [blame]
// Copyright Istio Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
import (
envoyCore ""
. ""
kubeMeta ""
mcsapi ""
import (
const (
serviceImportName = "test-svc"
serviceImportNamespace = "test-ns"
serviceImportPodIP = ""
serviceImportCluster = "test-cluster"
var (
serviceImportNamespacedName = types.NamespacedName{
Namespace: serviceImportNamespace,
Name: serviceImportName,
serviceImportClusterSetHost = serviceClusterSetLocalHostname(serviceImportNamespacedName)
serviceImportVIPs = []string{""}
serviceImportTimeout = retry.Timeout(2 * time.Second)
func TestServiceNotImported(t *testing.T) {
for _, mode := range []EndpointMode{EndpointsOnly, EndpointSliceOnly} {
t.Run(mode.String(), func(t *testing.T) {
c, ic := newTestServiceImportCache(t, mode)
ic.createKubeService(t, c)
// Check that the service does not have ClusterSet IPs.
func TestServiceImportedAfterCreated(t *testing.T) {
for _, mode := range []EndpointMode{EndpointsOnly, EndpointSliceOnly} {
t.Run(mode.String(), func(t *testing.T) {
c, ic := newTestServiceImportCache(t, mode)
ic.createKubeService(t, c)
ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
// Check that the service has been assigned ClusterSet IPs.
func TestServiceCreatedAfterImported(t *testing.T) {
for _, mode := range []EndpointMode{EndpointsOnly, EndpointSliceOnly} {
t.Run(mode.String(), func(t *testing.T) {
c, ic := newTestServiceImportCache(t, mode)
ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
ic.createKubeService(t, c)
// Check that the service has been assigned ClusterSet IPs.
func TestUpdateImportedService(t *testing.T) {
for _, mode := range []EndpointMode{EndpointsOnly, EndpointSliceOnly} {
t.Run(mode.String(), func(t *testing.T) {
c, ic := newTestServiceImportCache(t, mode)
ic.createKubeService(t, c)
ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
// Update the k8s service and verify that both services are updated.
func TestHeadlessServiceImported(t *testing.T) {
for _, mode := range []EndpointMode{EndpointsOnly, EndpointSliceOnly} {
t.Run(mode.String(), func(t *testing.T) {
// Create and run the controller.
c, ic := newTestServiceImportCache(t, mode)
ic.createKubeService(t, c)
ic.createServiceImport(t, mcsapi.Headless, nil)
// Verify that we did not generate the synthetic service for the headless service.
func TestDeleteImportedService(t *testing.T) {
for _, mode := range []EndpointMode{EndpointsOnly, EndpointSliceOnly} {
t.Run(mode.String(), func(t *testing.T) {
// Create and run the controller.
c1, ic := newTestServiceImportCache(t, mode)
// Create and run another controller.
c2, _ := NewFakeControllerWithOptions(FakeControllerOptions{
Stop: c1.stop,
ClusterID: "test-cluster2",
Mode: mode,
go c2.Run(c2.stop)
cache.WaitForCacheSync(c2.stop, c2.HasSynced)
c1.opts.MeshServiceController.AddRegistryAndRun(c2, c2.stop)
ic.createKubeService(t, c1)
ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
// create the same service in cluster2
createService(c2, serviceImportName, serviceImportNamespace, map[string]string{},
[]int32{8080}, map[string]string{"app": "prod-app"}, t)
// Delete the k8s service and verify that all internal services are removed.
ic.deleteKubeService(t, c2)
func TestUnimportService(t *testing.T) {
for _, mode := range []EndpointMode{EndpointsOnly, EndpointSliceOnly} {
t.Run(mode.String(), func(t *testing.T) {
// Create and run the controller.
c, ic := newTestServiceImportCache(t, mode)
ic.createKubeService(t, c)
ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
func TestAddServiceImportVIPs(t *testing.T) {
for _, mode := range []EndpointMode{EndpointsOnly, EndpointSliceOnly} {
t.Run(mode.String(), func(t *testing.T) {
// Create and run the controller.
c, ic := newTestServiceImportCache(t, mode)
ic.createKubeService(t, c)
ic.createServiceImport(t, mcsapi.ClusterSetIP, nil)
ic.setServiceImportVIPs(t, serviceImportVIPs)
func TestUpdateServiceImportVIPs(t *testing.T) {
for _, mode := range []EndpointMode{EndpointsOnly, EndpointSliceOnly} {
t.Run(mode.String(), func(t *testing.T) {
// Create and run the controller.
c, ic := newTestServiceImportCache(t, mode)
ic.createKubeService(t, c)
ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
updatedVIPs := []string{"", ""}
ic.setServiceImportVIPs(t, updatedVIPs)
func newTestServiceImportCache(t test.Failer, mode EndpointMode) (c *FakeController, ic *serviceImportCacheImpl) {
stopCh := make(chan struct{})
test.SetBoolForTest(t, &features.EnableMCSHost, true)
t.Cleanup(func() {
c, _ = NewFakeControllerWithOptions(FakeControllerOptions{
Stop: stopCh,
ClusterID: serviceImportCluster,
Mode: mode,
go c.Run(c.stop)
cache.WaitForCacheSync(c.stop, c.HasSynced)
ic = c.imports.(*serviceImportCacheImpl)
func (ic *serviceImportCacheImpl) createKubeService(t *testing.T, c *FakeController) {
// Create the test service and endpoints.
createService(c, serviceImportName, serviceImportNamespace, map[string]string{},
[]int32{8080}, map[string]string{"app": "prod-app"}, t)
createEndpoints(t, c, serviceImportName, serviceImportNamespace, []string{"tcp-port"}, []string{serviceImportPodIP}, nil, nil)
isImported := ic.isImported(serviceImportNamespacedName)
// Wait for the resources to be processed by the controller.
retry.UntilSuccessOrFail(t, func() error {
clusterLocalHost := ic.clusterLocalHost()
if svc := c.GetService(clusterLocalHost); svc == nil {
return fmt.Errorf("failed looking up service for host %s", clusterLocalHost)
var expectedHosts map[host.Name]struct{}
if isImported {
expectedHosts = map[host.Name]struct{}{
clusterLocalHost: {},
serviceImportClusterSetHost: {},
} else {
expectedHosts = map[host.Name]struct{}{
clusterLocalHost: {},
instances := ic.getProxyServiceInstances()
if len(instances) != len(expectedHosts) {
return fmt.Errorf("expected 1 service instance, found %d", len(instances))
for _, si := range instances {
if si.Service == nil {
return fmt.Errorf("proxy ServiceInstance has nil service")
if si.Endpoint == nil {
return fmt.Errorf("proxy ServiceInstance has nil endpoint")
if _, found := expectedHosts[si.Service.Hostname]; !found {
return fmt.Errorf("found proxy ServiceInstance for unexpected host: %s", si.Service.Hostname)
delete(expectedHosts, si.Service.Hostname)
if len(expectedHosts) > 0 {
return fmt.Errorf("failed to find proxy ServiceInstances for hosts: %v", expectedHosts)
return nil
}, serviceImportTimeout)
func (ic *serviceImportCacheImpl) updateKubeService(t *testing.T) {
svc, _ := ic.client.CoreV1().Services(serviceImportNamespace).Get(context.TODO(), serviceImportName, kubeMeta.GetOptions{})
if svc == nil {
t.Fatalf("failed to find k8s service: %s/%s", serviceImportNamespace, serviceImportName)
// Just add a new label.
svc.Labels = map[string]string{
"foo": "bar",
if _, err := ic.client.CoreV1().Services(serviceImportNamespace).Update(context.TODO(), svc, kubeMeta.UpdateOptions{}); err != nil {
hostNames := []host.Name{
// Wait for the services to pick up the label.
retry.UntilSuccessOrFail(t, func() error {
for _, hostName := range hostNames {
svc := ic.GetService(hostName)
if svc == nil {
return fmt.Errorf("failed to find service for host %s", hostName)
if svc.Attributes.Labels["foo"] != "bar" {
return fmt.Errorf("service not updated for %s", hostName)
return nil
}, serviceImportTimeout)
func (ic *serviceImportCacheImpl) deleteKubeService(t *testing.T, anotherCluster *FakeController) {
if err := anotherCluster.client.CoreV1().Services(serviceImportNamespace).Delete(context.TODO(), serviceImportName, kubeMeta.DeleteOptions{}); err != nil {
// Wait for the resources to be processed by the controller.
if err := ic.client.CoreV1().Services(serviceImportNamespace).Delete(context.TODO(), serviceImportName, kubeMeta.DeleteOptions{}); err != nil {
// Wait for the resources to be processed by the controller.
retry.UntilSuccessOrFail(t, func() error {
if svc := ic.GetService(ic.clusterLocalHost()); svc != nil {
return fmt.Errorf("found deleted service for host %s", ic.clusterLocalHost())
if svc := ic.GetService(serviceImportClusterSetHost); svc != nil {
return fmt.Errorf("found deleted service for host %s", serviceImportClusterSetHost)
instances := ic.getProxyServiceInstances()
if len(instances) != 0 {
return fmt.Errorf("expected 0 service instance, found %d", len(instances))
return nil
}, serviceImportTimeout)
func (ic *serviceImportCacheImpl) getProxyServiceInstances() []*model.ServiceInstance {
return ic.GetProxyServiceInstances(&model.Proxy{
Type: model.SidecarProxy,
IPAddresses: []string{serviceImportPodIP},
Locality: &envoyCore.Locality{Region: "r", Zone: "z"},
ConfigNamespace: serviceImportNamespace,
Metadata: &model.NodeMetadata{
ServiceAccount: "account",
ClusterID: ic.Cluster(),
Labels: map[string]string{
"app": "prod-app",
label.SecurityTlsMode.Name: "mutual",
func (ic *serviceImportCacheImpl) getServiceImport(t *testing.T) *mcsapi.ServiceImport {
// Get the ServiceImport as unstructured
u, err := ic.client.Dynamic().Resource(mcs.ServiceImportGVR).Namespace(serviceImportNamespace).Get(
context.TODO(), serviceImportName, kubeMeta.GetOptions{})
if err != nil {
return nil
// Convert to ServiceImport
si := &mcsapi.ServiceImport{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, si); err != nil {
return si
func (ic *serviceImportCacheImpl) checkServiceInstances(t *testing.T) {
g := NewWithT(t)
si := ic.getServiceImport(t)
var expectedIPs []string
expectedServiceCount := 1
expectMCSService := false
if si != nil && si.Spec.Type == mcsapi.ClusterSetIP && len(si.Spec.IPs) > 0 {
expectedIPs = si.Spec.IPs
expectedServiceCount = 2
expectMCSService = true
instances := ic.getProxyServiceInstances()
for _, inst := range instances {
svc := inst.Service
if svc.Hostname == serviceImportClusterSetHost {
if !expectMCSService {
t.Fatalf("found ServiceInstance for unimported service %s", serviceImportClusterSetHost)
// Check the ClusterSet IPs.
if expectMCSService {
t.Fatalf("failed finding ServiceInstance for %s", serviceImportClusterSetHost)
func (ic *serviceImportCacheImpl) createServiceImport(t *testing.T, importType mcsapi.ServiceImportType, vips []string) {
// Create the ServiceImport resource in the cluster.
_, err := ic.client.Dynamic().Resource(mcs.ServiceImportGVR).Namespace(serviceImportNamespace).Create(context.TODO(),
newServiceImport(importType, vips),
if err != nil {
shouldCreateMCSService := importType == mcsapi.ClusterSetIP && len(vips) > 0 &&
ic.GetService(ic.clusterLocalHost()) != nil
// Wait for the import to be processed by the controller.
retry.UntilSuccessOrFail(t, func() error {
if !ic.isImported(serviceImportNamespacedName) {
return fmt.Errorf("serviceImport not found for %s", serviceImportClusterSetHost)
if shouldCreateMCSService && ic.GetService(serviceImportClusterSetHost) == nil {
return fmt.Errorf("failed to find service for %s", serviceImportClusterSetHost)
return nil
}, serviceImportTimeout)
if shouldCreateMCSService {
// Wait for the XDS event.
func (ic *serviceImportCacheImpl) setServiceImportVIPs(t *testing.T, vips []string) {
// Get the ServiceImport
si := ic.getServiceImport(t)
// Apply the ClusterSet IPs.
si.Spec.IPs = vips
if _, err := ic.client.Dynamic().Resource(mcs.ServiceImportGVR).Namespace(serviceImportNamespace).Update(
context.TODO(), toUnstructured(si), kubeMeta.UpdateOptions{}); err != nil {
if len(vips) > 0 {
// Wait for the import to be processed by the controller.
retry.UntilSuccessOrFail(t, func() error {
svc := ic.GetService(serviceImportClusterSetHost)
if svc == nil {
return fmt.Errorf("failed to find service for %s", serviceImportClusterSetHost)
actualVIPs := svc.ClusterVIPs.GetAddressesFor(ic.Cluster())
if !reflect.DeepEqual(vips, actualVIPs) {
return fmt.Errorf("expected ClusterSet VIPs %v, but found %v", vips, actualVIPs)
return nil
}, serviceImportTimeout)
// Wait for the XDS event.
} else {
// Wait for the import to be processed by the controller.
retry.UntilSuccessOrFail(t, func() error {
if svc := ic.GetService(serviceImportClusterSetHost); svc != nil {
return fmt.Errorf("found unexpected service for %s", serviceImportClusterSetHost)
return nil
}, serviceImportTimeout)
func (ic *serviceImportCacheImpl) unimportService(t *testing.T) {
if err := ic.client.Dynamic().Resource(mcs.ServiceImportGVR).Namespace(serviceImportNamespace).Delete(
context.TODO(), serviceImportName, kubeMeta.DeleteOptions{}); err != nil {
// Wait for the import to be processed by the controller.
retry.UntilSuccessOrFail(t, func() error {
if ic.isImported(serviceImportNamespacedName) {
return fmt.Errorf("serviceImport found for %s", serviceImportClusterSetHost)
if ic.GetService(serviceImportClusterSetHost) != nil {
return fmt.Errorf("found MCS service for unimported service %s", serviceImportClusterSetHost)
return nil
}, serviceImportTimeout)
func (ic *serviceImportCacheImpl) isImported(name types.NamespacedName) bool {
_, err := ic.lister.ByNamespace(name.Namespace).Get(name.Name)
return err == nil
func (ic *serviceImportCacheImpl) waitForXDS(t *testing.T) {
retry.UntilSuccessOrFail(t, func() error {
return ic.checkXDS()
}, serviceImportTimeout)
func (ic *serviceImportCacheImpl) checkXDS() error {
event := ic.opts.XDSUpdater.(*FakeXdsUpdater).Wait("service")
if event == nil {
return errors.New("failed waiting for XDS event")
// The name of the event will be the cluster-local hostname.
eventID := serviceImportClusterSetHost.String()
if event.ID != eventID {
return fmt.Errorf("waitForXDS failed: expected event id=%s, but found %s", eventID, event.ID)
return nil
func (ic *serviceImportCacheImpl) clusterLocalHost() host.Name {
return kube.ServiceHostname(serviceImportName, serviceImportNamespace, ic.opts.DomainSuffix)
func newServiceImport(importType mcsapi.ServiceImportType, vips []string) *unstructured.Unstructured {
si := &mcsapi.ServiceImport{
TypeMeta: kubeMeta.TypeMeta{
Kind: "ServiceImport",
APIVersion: "",
ObjectMeta: kubeMeta.ObjectMeta{
Name: serviceImportName,
Namespace: serviceImportNamespace,
Spec: mcsapi.ServiceImportSpec{
Type: importType,
IPs: vips,
return toUnstructured(si)
func toUnstructured(o interface{}) *unstructured.Unstructured {
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(o)
if err != nil {
return &unstructured.Unstructured{Object: u}