| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package scaffold |
| |
| import ( |
| "bytes" |
| "fmt" |
| "os" |
| "time" |
| |
| "github.com/gruntwork-io/terratest/modules/k8s" |
| . "github.com/onsi/ginkgo/v2" //nolint:staticcheck |
| . "github.com/onsi/gomega" //nolint:staticcheck |
| corev1 "k8s.io/api/core/v1" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/utils/ptr" |
| |
| "github.com/apache/apisix-ingress-controller/pkg/utils" |
| "github.com/apache/apisix-ingress-controller/test/e2e/framework" |
| ) |
| |
| type APISIXDeployOptions struct { |
| Namespace string |
| AdminKey string |
| |
| ServiceName string |
| ServiceType string |
| ServiceHTTPPort int |
| ServiceHTTPSPort int |
| |
| ConfigProvider string |
| Replicas *int |
| } |
| |
| type APISIXDeployer struct { |
| *Scaffold |
| adminTunnel *k8s.Tunnel |
| } |
| |
| func NewAPISIXDeployer(s *Scaffold) Deployer { |
| return &APISIXDeployer{ |
| Scaffold: s, |
| } |
| } |
| |
| func (s *APISIXDeployer) BeforeEach() { |
| s.runtimeOpts = s.opts |
| s.namespace = fmt.Sprintf("ingress-apisix-e2e-tests-%s-%d", s.runtimeOpts.Name, time.Now().Nanosecond()) |
| s.kubectlOptions = &k8s.KubectlOptions{ |
| ConfigPath: s.runtimeOpts.Kubeconfig, |
| Namespace: s.namespace, |
| } |
| if s.runtimeOpts.ControllerName == "" { |
| s.runtimeOpts.ControllerName = fmt.Sprintf("%s/%s", DefaultControllerName, s.namespace) |
| } |
| |
| s.finalizers = nil |
| |
| // Initialize additionalGateways map |
| s.additionalGateways = make(map[string]*GatewayResources) |
| |
| k8s.CreateNamespace(s.t, s.kubectlOptions, s.namespace) |
| |
| if s.runtimeOpts.APISIXAdminAPIKey == "" { |
| s.runtimeOpts.APISIXAdminAPIKey = getEnvOrDefault("APISIX_ADMIN_KEY", "edd1c9f034335f136f87ad84b625c8f1") |
| } |
| |
| s.Logf("apisix admin api key: %s", s.runtimeOpts.APISIXAdminAPIKey) |
| |
| e := utils.ParallelExecutor{} |
| |
| e.Add(func() { |
| defer GinkgoRecover() |
| s.DeployDataplane(DeployDataplaneOptions{}) |
| s.DeployIngress() |
| adminTunnel, err := s.createAdminTunnel(s.dataplaneService) |
| Expect(err).NotTo(HaveOccurred()) |
| s.adminTunnel = adminTunnel |
| }) |
| e.Add(func() { |
| defer GinkgoRecover() |
| s.DeployTestService() |
| }) |
| e.Wait() |
| } |
| |
| func (s *APISIXDeployer) AfterEach() { |
| if CurrentSpecReport().Failed() { |
| if os.Getenv("TEST_ENV") == "CI" { |
| _, _ = fmt.Fprintln(GinkgoWriter, "Dumping namespace contents") |
| _, _ = k8s.RunKubectlAndGetOutputE(GinkgoT(), s.kubectlOptions, "get", "deploy,sts,svc,pods,gatewayproxy") |
| _, _ = k8s.RunKubectlAndGetOutputE(GinkgoT(), s.kubectlOptions, "describe", "pods") |
| } |
| |
| output := s.GetDeploymentLogs("apisix-ingress-controller") |
| if output != "" { |
| _, _ = fmt.Fprintln(GinkgoWriter, output) |
| } |
| } |
| |
| // Delete all additional gateways |
| for identifier := range s.additionalGateways { |
| err := s.CleanupAdditionalGateway(identifier) |
| Expect(err).NotTo(HaveOccurred(), "cleaning up additional gateway") |
| } |
| |
| for i := len(s.finalizers) - 1; i >= 0; i-- { |
| runWithRecover(s.finalizers[i]) |
| } |
| |
| // if the test case is successful, just delete namespace |
| err := k8s.DeleteNamespaceE(s.t, s.kubectlOptions, s.namespace) |
| Expect(err).NotTo(HaveOccurred(), "deleting namespace "+s.namespace) |
| |
| // Wait for a while to prevent the worker node being overwhelming |
| // (new cases will be run). |
| time.Sleep(3 * time.Second) |
| } |
| |
| func (s *APISIXDeployer) DeployDataplane(deployOpts DeployDataplaneOptions) { |
| opts := APISIXDeployOptions{ |
| Namespace: s.namespace, |
| AdminKey: s.runtimeOpts.APISIXAdminAPIKey, |
| ServiceHTTPPort: 9080, |
| ServiceHTTPSPort: 9443, |
| Replicas: ptr.To(1), |
| } |
| |
| if deployOpts.Namespace != "" { |
| opts.Namespace = deployOpts.Namespace |
| } |
| if deployOpts.ServiceType != "" { |
| opts.ServiceType = deployOpts.ServiceType |
| } |
| if deployOpts.ServiceHTTPPort != 0 { |
| opts.ServiceHTTPPort = deployOpts.ServiceHTTPPort |
| } |
| if deployOpts.ServiceHTTPSPort != 0 { |
| opts.ServiceHTTPSPort = deployOpts.ServiceHTTPSPort |
| } |
| if deployOpts.AdminKey != "" { |
| opts.AdminKey = deployOpts.AdminKey |
| } |
| if deployOpts.Replicas != nil { |
| opts.Replicas = deployOpts.Replicas |
| } |
| |
| for _, close := range []func(){ |
| s.closeAdminTunnel, |
| } { |
| close() |
| } |
| |
| if s.apisixTunnels != nil { |
| s.apisixTunnels.Close() |
| } |
| |
| svc := s.deployDataplane(&opts) |
| s.dataplaneService = svc |
| |
| if opts.Replicas != nil && *opts.Replicas == 0 { |
| deployOpts.SkipCreateTunnels = true |
| } |
| if !deployOpts.SkipCreateTunnels { |
| err := s.newAPISIXTunnels(opts.ServiceName) |
| Expect(err).ToNot(HaveOccurred(), "creating apisix tunnels") |
| } |
| } |
| |
| func (s *APISIXDeployer) newAPISIXTunnels(serviceName string) error { |
| apisixTunnels, err := s.createDataplaneTunnels(s.dataplaneService, s.kubectlOptions, serviceName) |
| if err != nil { |
| return err |
| } |
| |
| s.apisixTunnels = apisixTunnels |
| return nil |
| } |
| |
| func (s *APISIXDeployer) deployDataplane(opts *APISIXDeployOptions) *corev1.Service { |
| if opts.ServiceName == "" { |
| opts.ServiceName = framework.ProviderType |
| } |
| |
| if opts.ServiceHTTPPort == 0 { |
| opts.ServiceHTTPPort = 80 |
| } |
| |
| if opts.ServiceHTTPSPort == 0 { |
| opts.ServiceHTTPSPort = 443 |
| } |
| |
| kubectlOpts := k8s.NewKubectlOptions("", "", opts.Namespace) |
| |
| if opts.ConfigProvider == "" { |
| opts.ConfigProvider = framework.ConfigProviderTypeYaml |
| if framework.ProviderType == framework.ProviderTypeAPISIX { |
| opts.ConfigProvider = framework.ConfigProviderTypeEtcd |
| } |
| } |
| |
| if opts.ConfigProvider == framework.ConfigProviderTypeEtcd { |
| k8s.KubectlApplyFromString(s.GinkgoT, kubectlOpts, framework.EtcdSpec) |
| err := framework.WaitPodsAvailable(s.GinkgoT, kubectlOpts, metav1.ListOptions{ |
| LabelSelector: "app=etcd", |
| }) |
| Expect(err).ToNot(HaveOccurred(), "waiting for etcd pod ready") |
| } |
| |
| buf := bytes.NewBuffer(nil) |
| err := framework.APISIXStandaloneTpl.Execute(buf, opts) |
| Expect(err).ToNot(HaveOccurred(), "executing template") |
| |
| k8s.KubectlApplyFromString(s.GinkgoT, kubectlOpts, buf.String()) |
| if opts.Replicas == nil || *opts.Replicas > 0 { |
| err = framework.WaitPodsAvailable(s.GinkgoT, kubectlOpts, metav1.ListOptions{ |
| LabelSelector: "app.kubernetes.io/name=apisix", |
| }) |
| Expect(err).ToNot(HaveOccurred(), "waiting for gateway pod ready") |
| } |
| |
| Eventually(func() bool { |
| svc, err := k8s.GetServiceE(s.GinkgoT, kubectlOpts, opts.ServiceName) |
| if err != nil { |
| s.Logf("failed to get service %s: %v", opts.ServiceName, err) |
| return false |
| } |
| if svc.Spec.Type == corev1.ServiceTypeLoadBalancer { |
| return len(svc.Status.LoadBalancer.Ingress) > 0 |
| } |
| return true |
| }, "20s", "4s").Should(BeTrue(), "waiting for LoadBalancer IP") |
| |
| svc, err := k8s.GetServiceE(s.GinkgoT, kubectlOpts, opts.ServiceName) |
| Expect(err).ToNot(HaveOccurred(), "failed to get service %s: %v", opts.ServiceName, err) |
| return svc |
| } |
| |
| func (s *APISIXDeployer) ScaleDataplane(replicas int) { |
| s.DeployDataplane(DeployDataplaneOptions{ |
| Replicas: ptr.To(replicas), |
| }) |
| } |
| |
| func (s *APISIXDeployer) DeployIngress() { |
| if s.runtimeOpts.EnableWebhook { |
| err := s.SetupWebhookResources() |
| Expect(err).NotTo(HaveOccurred(), "setting up webhook resources") |
| } |
| |
| s.Framework.DeployIngress(framework.IngressDeployOpts{ |
| ControllerName: s.runtimeOpts.ControllerName, |
| ProviderType: framework.ProviderType, |
| ProviderSyncPeriod: 1 * time.Hour, |
| Namespace: s.namespace, |
| Replicas: ptr.To(1), |
| WebhookEnable: s.runtimeOpts.EnableWebhook, |
| }) |
| } |
| |
| func (s *APISIXDeployer) ScaleIngress(replicas int) { |
| s.Framework.DeployIngress(framework.IngressDeployOpts{ |
| ControllerName: s.runtimeOpts.ControllerName, |
| ProviderType: framework.ProviderType, |
| ProviderSyncPeriod: 1 * time.Hour, |
| Namespace: s.namespace, |
| Replicas: ptr.To(replicas), |
| }) |
| } |
| |
| // getEnvOrDefault returns environment variable value or default |
| func getEnvOrDefault(key, defaultValue string) string { |
| if value := os.Getenv(key); value != "" { |
| return value |
| } |
| return defaultValue |
| } |
| |
| func (s *APISIXDeployer) createAdminTunnel(svc *corev1.Service) (*k8s.Tunnel, error) { |
| var ( |
| adminPort int |
| ) |
| |
| for _, port := range svc.Spec.Ports { |
| switch port.Name { |
| case "admin": |
| adminPort = int(port.Port) |
| } |
| } |
| |
| kubectlOpts := k8s.NewKubectlOptions("", "", svc.Namespace) |
| |
| adminTunnel := k8s.NewTunnel(kubectlOpts, k8s.ResourceTypeService, svc.Name, |
| 0, adminPort) |
| |
| if err := adminTunnel.ForwardPortE(s.t); err != nil { |
| return nil, err |
| } |
| s.addFinalizers(s.closeAdminTunnel) |
| |
| return adminTunnel, nil |
| } |
| |
| func (s *APISIXDeployer) closeAdminTunnel() { |
| if s.adminTunnel != nil { |
| s.adminTunnel.Close() |
| s.adminTunnel = nil |
| } |
| } |
| |
| func (s *APISIXDeployer) CreateAdditionalGateway(namePrefix string) (string, *corev1.Service, error) { |
| return s.CreateAdditionalGatewayWithOptions(namePrefix, DeployDataplaneOptions{}) |
| } |
| |
| func (s *APISIXDeployer) CreateAdditionalGatewayWithOptions(namePrefix string, opts DeployDataplaneOptions) (string, *corev1.Service, error) { |
| // Create a new namespace for this additional gateway |
| additionalNS := fmt.Sprintf("%s-%d", namePrefix, time.Now().Unix()) |
| |
| k8s.CreateNamespace(s.t, s.kubectlOptions, additionalNS) |
| |
| // Create new kubectl options for the new namespace |
| kubectlOpts := &k8s.KubectlOptions{ |
| ConfigPath: s.runtimeOpts.Kubeconfig, |
| Namespace: additionalNS, |
| } |
| |
| s.Logf("additional gateway in namespace %s", additionalNS) |
| |
| // Use the same admin key as the main gateway |
| adminKey := s.runtimeOpts.APISIXAdminAPIKey |
| s.Logf("additional gateway admin api key: %s", adminKey) |
| |
| // Store gateway resources info |
| resources := &GatewayResources{ |
| Namespace: additionalNS, |
| AdminAPIKey: adminKey, |
| } |
| |
| // Deploy dataplane for this additional gateway |
| o := APISIXDeployOptions{ |
| Namespace: additionalNS, |
| AdminKey: adminKey, |
| ServiceHTTPPort: 9080, |
| ServiceHTTPSPort: 9443, |
| } |
| if opts.Namespace != "" { |
| o.Namespace = opts.Namespace |
| } |
| if opts.AdminKey != "" { |
| o.AdminKey = opts.AdminKey |
| } |
| if opts.ServiceHTTPPort != 0 { |
| o.ServiceHTTPPort = opts.ServiceHTTPPort |
| } |
| if opts.ServiceHTTPSPort != 0 { |
| o.ServiceHTTPSPort = opts.ServiceHTTPSPort |
| } |
| if opts.ProviderType != "" { |
| if opts.ProviderType == framework.ProviderTypeAPISIX { |
| o.ConfigProvider = framework.ConfigProviderTypeEtcd |
| } else { |
| o.ConfigProvider = framework.ConfigProviderTypeYaml |
| } |
| } |
| svc := s.deployDataplane(&o) |
| |
| resources.DataplaneService = svc |
| |
| // Create tunnels for the dataplane |
| tunnels, err := s.createDataplaneTunnels(svc, kubectlOpts, svc.Name) |
| if err != nil { |
| return "", nil, err |
| } |
| |
| resources.Tunnels = tunnels |
| |
| // Use namespace as identifier for APISIX deployments |
| identifier := additionalNS |
| |
| // Store in the map |
| s.additionalGateways[identifier] = resources |
| |
| return identifier, svc, nil |
| } |
| |
| func (s *APISIXDeployer) CleanupAdditionalGateway(identifier string) error { |
| resources, exists := s.additionalGateways[identifier] |
| if !exists { |
| return fmt.Errorf("gateway %s not found", identifier) |
| } |
| |
| // Close tunnels if they exist |
| resources.Tunnels.Close() |
| |
| // Delete the namespace |
| err := k8s.DeleteNamespaceE(s.t, &k8s.KubectlOptions{ |
| ConfigPath: s.runtimeOpts.Kubeconfig, |
| Namespace: resources.Namespace, |
| }, resources.Namespace) |
| |
| // Remove from the map |
| delete(s.additionalGateways, identifier) |
| |
| return err |
| } |
| |
| func (s *APISIXDeployer) GetAdminEndpoint(svc ...*corev1.Service) string { |
| if len(svc) == 0 { |
| return fmt.Sprintf("http://%s.%s:9180", s.dataplaneService.Name, s.dataplaneService.Namespace) |
| } |
| return fmt.Sprintf("http://%s.%s:9180", svc[0].Name, svc[0].Namespace) |
| } |
| |
| func (s *APISIXDeployer) GetAdminServiceName() string { |
| return s.dataplaneService.Name |
| } |
| func (s *APISIXDeployer) DefaultDataplaneResource() DataplaneResource { |
| return newADCDataplaneResource( |
| framework.ProviderType, |
| fmt.Sprintf("http://%s", s.adminTunnel.Endpoint()), |
| s.AdminKey(), |
| false, // tlsVerify |
| ) |
| } |