Let `compose` support `setup.steps` (#26)
diff --git a/examples/compose/e2e.yaml b/examples/compose/e2e.yaml
index 459a586..6b16685 100644
--- a/examples/compose/e2e.yaml
+++ b/examples/compose/e2e.yaml
@@ -19,6 +19,18 @@
env: compose
file: docker-compose.yml
timeout: 1200
+ steps:
+ - name: install yq
+ command: |
+ if ! type "yq" > /dev/null; then
+ mkdir -p /tmp/skywalking-infra-e2e/bin && cd /tmp/skywalking-infra-e2e
+ mkdir yq && cd yq
+ wget -O yq.tar.gz https://github.com/mikefarah/yq/archive/v4.11.1.tar.gz
+ tar -zxf yq.tar.gz --strip=1
+ go install && go build -ldflags -s && mv yq ../bin/yq
+ export PATH="$PATH:/tmp/skywalking-infra-e2e/bin"
+ echo "success to install yq"
+ fi
cleanup:
# always never success failure
diff --git a/internal/components/setup/common.go b/internal/components/setup/common.go
index 7d1213a..1e93b9d 100644
--- a/internal/components/setup/common.go
+++ b/internal/components/setup/common.go
@@ -24,11 +24,118 @@
"os"
"time"
+ "k8s.io/client-go/dynamic"
+ "k8s.io/client-go/kubernetes"
+
"github.com/apache/skywalking-infra-e2e/internal/config"
+ "github.com/apache/skywalking-infra-e2e/internal/constant"
"github.com/apache/skywalking-infra-e2e/internal/logger"
"github.com/apache/skywalking-infra-e2e/internal/util"
)
+func RunStepsAndWait(steps []config.Step, timeout int, k8sCluster *util.K8sClusterInfo) error {
+ var waitTimeout time.Duration
+ if timeout <= 0 {
+ waitTimeout = constant.DefaultWaitTimeout
+ } else {
+ waitTimeout = time.Duration(timeout) * time.Second
+ }
+ logger.Log.Debugf("wait timeout is %d seconds", int(waitTimeout.Seconds()))
+
+ // record time now
+ timeNow := time.Now()
+
+ for _, step := range steps {
+ logger.Log.Infof("processing setup step [%s]", step.Name)
+
+ if step.Path != "" && step.Command == "" {
+ if k8sCluster == nil {
+ return fmt.Errorf("not support path")
+ }
+ manifest := config.Manifest{
+ Path: step.Path,
+ Waits: step.Waits,
+ }
+ err := createManifestAndWait(k8sCluster.Client, k8sCluster.Interface, manifest, waitTimeout)
+ if err != nil {
+ return err
+ }
+ } else if step.Command != "" && step.Path == "" {
+ command := config.Run{
+ Command: step.Command,
+ Waits: step.Waits,
+ }
+
+ err := RunCommandsAndWait(command, waitTimeout)
+ if err != nil {
+ return err
+ }
+ } else {
+ return fmt.Errorf("step parameter error, one Path or one Command should be specified, but got %+v", step)
+ }
+
+ waitTimeout = NewTimeout(timeNow, waitTimeout)
+ timeNow = time.Now()
+
+ if waitTimeout <= 0 {
+ return fmt.Errorf("setup timeout")
+ }
+ }
+ return nil
+}
+
+// createManifestAndWait creates manifests in k8s cluster and concurrent waits according to the manifests' wait conditions.
+func createManifestAndWait(c *kubernetes.Clientset, dc dynamic.Interface, manifest config.Manifest, timeout time.Duration) error {
+ waitSet := util.NewWaitSet(timeout)
+
+ kubeConfigYaml, err := ioutil.ReadFile(kubeConfigPath)
+ if err != nil {
+ return err
+ }
+
+ waits := manifest.Waits
+ err = createByManifest(c, dc, manifest)
+ if err != nil {
+ return err
+ }
+
+ // len() for nil slices is defined as zero
+ if len(waits) == 0 {
+ logger.Log.Info("no wait-for strategy is provided")
+ return nil
+ }
+
+ for idx := range waits {
+ wait := waits[idx]
+ logger.Log.Infof("waiting for %+v", wait)
+
+ options, err := getWaitOptions(kubeConfigYaml, &wait)
+ if err != nil {
+ return err
+ }
+
+ waitSet.WaitGroup.Add(1)
+ go concurrentlyWait(&wait, options, waitSet)
+ }
+
+ go func() {
+ waitSet.WaitGroup.Wait()
+ close(waitSet.FinishChan)
+ }()
+
+ select {
+ case <-waitSet.FinishChan:
+ logger.Log.Infof("create and wait for manifest ready success")
+ case err := <-waitSet.ErrChan:
+ logger.Log.Errorf("failed to wait for manifest to be ready")
+ return err
+ case <-time.After(waitSet.Timeout):
+ return fmt.Errorf("wait for manifest ready timeout after %d seconds", int(timeout.Seconds()))
+ }
+
+ return nil
+}
+
// RunCommandsAndWait Concurrently run commands and wait for conditions.
func RunCommandsAndWait(run config.Run, timeout time.Duration) error {
waitSet := util.NewWaitSet(timeout)
diff --git a/internal/components/setup/compose.go b/internal/components/setup/compose.go
index 07ec57c..9ba463c 100644
--- a/internal/components/setup/compose.go
+++ b/internal/components/setup/compose.go
@@ -62,6 +62,55 @@
compose := testcontainers.NewLocalDockerCompose(composeFilePaths, identifier)
// bind wait port
+ serviceWithPorts, err := bindWaitPort(e2eConfig, compose)
+ if err != nil {
+ return fmt.Errorf("bind wait ports error: %v", err)
+ }
+
+ execError := compose.WithCommand([]string{"up", "-d"}).Invoke()
+ if execError.Error != nil {
+ return execError.Error
+ }
+
+ // find exported port and build env
+ for service, portList := range serviceWithPorts {
+ container, err2 := findContainer(cli, fmt.Sprintf("%s_%s", identifier, getInstanceName(service)))
+ if err2 != nil {
+ return err2
+ }
+ containerPorts := container.Ports
+
+ for inx := range portList {
+ for _, containerPort := range containerPorts {
+ if int(containerPort.PrivatePort) != portList[inx] {
+ continue
+ }
+
+ // expose env config to env
+ // format: <service_name>_<port>
+ envKey := fmt.Sprintf("%s_%d", service, containerPort.PrivatePort)
+ envValue := fmt.Sprintf("%d", containerPort.PublicPort)
+ err2 = os.Setenv(envKey, envValue)
+ if err2 != nil {
+ return fmt.Errorf("could not set env for %s:%d, %v", service, portList[inx], err2)
+ }
+ logger.Log.Infof("expose env : %s : %s", envKey, envValue)
+ break
+ }
+ }
+ }
+
+ // run steps
+ err = RunStepsAndWait(e2eConfig.Setup.Steps, e2eConfig.Setup.Timeout, nil)
+ if err != nil {
+ logger.Log.Errorf("execute steps error: %v", err)
+ return err
+ }
+
+ return nil
+}
+
+func bindWaitPort(e2eConfig *config.E2EConfig, compose *testcontainers.LocalDockerCompose) (map[string][]int, error) {
timeout := e2eConfig.Setup.Timeout
var waitTimeout time.Duration
if timeout <= 0 {
@@ -82,7 +131,7 @@
for inx := range portList {
exportPort, err := getExpectPort(portList[inx])
if err != nil {
- return err
+ return nil, err
}
serviceWithPorts[service] = append(serviceWithPorts[service], exportPort)
@@ -92,41 +141,7 @@
wait.NewHostPortStrategy(nat.Port(fmt.Sprintf("%d/tcp", exportPort))).WithStartupTimeout(waitTimeout))
}
}
-
- execError := compose.WithCommand([]string{"up", "-d"}).Invoke()
- if execError.Error != nil {
- return execError.Error
- }
-
- // find exported port and build env
- for service, portList := range serviceWithPorts {
- container, err := findContainer(cli, fmt.Sprintf("%s_%s", identifier, getInstanceName(service)))
- if err != nil {
- return err
- }
- containerPorts := container.Ports
-
- for inx := range portList {
- for _, containerPort := range containerPorts {
- if int(containerPort.PrivatePort) != portList[inx] {
- continue
- }
-
- // expose env config to env
- // format: <service_name>_<port>
- envKey := fmt.Sprintf("%s_%d", service, containerPort.PrivatePort)
- envValue := fmt.Sprintf("%d", containerPort.PublicPort)
- err = os.Setenv(envKey, envValue)
- if err != nil {
- return fmt.Errorf("could not set env for %s:%d, %v", service, portList[inx], err)
- }
- logger.Log.Infof("expose env : %s : %s", envKey, envValue)
- break
- }
- }
- }
-
- return nil
+ return serviceWithPorts, nil
}
func getExpectPort(portConfig interface{}) (int, error) {
diff --git a/internal/components/setup/kind.go b/internal/components/setup/kind.go
index 1c8e0f1..e50ccd9 100644
--- a/internal/components/setup/kind.go
+++ b/internal/components/setup/kind.go
@@ -20,16 +20,13 @@
import (
"fmt"
- "io/ioutil"
"os"
"strings"
- "time"
"k8s.io/cli-runtime/pkg/genericclioptions"
- ctlwait "k8s.io/kubectl/pkg/cmd/wait"
-
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
+ ctlwait "k8s.io/kubectl/pkg/cmd/wait"
"github.com/apache/skywalking-infra-e2e/internal/config"
@@ -69,58 +66,19 @@
return err
}
- c, dc, err := util.ConnectToK8sCluster(kubeConfigPath)
+ cluster, err := util.ConnectToK8sCluster(kubeConfigPath)
if err != nil {
logger.Log.Errorf("connect to k8s cluster failed according to config file: %s", kubeConfigPath)
return err
}
- // run commands and manifests
- timeout := e2eConfig.Setup.Timeout
- var waitTimeout time.Duration
- if timeout <= 0 {
- waitTimeout = constant.DefaultWaitTimeout
- } else {
- waitTimeout = time.Duration(timeout) * time.Second
+ // run steps
+ err = RunStepsAndWait(e2eConfig.Setup.Steps, e2eConfig.Setup.Timeout, cluster)
+ if err != nil {
+ logger.Log.Errorf("execute steps error: %v", err)
+ return err
}
- logger.Log.Debugf("wait timeout is %d seconds", int(waitTimeout.Seconds()))
- // record time now
- timeNow := time.Now()
-
- for _, step := range e2eConfig.Setup.Steps {
- logger.Log.Infof("processing setup step [%s]", step.Name)
-
- if step.Path != "" && step.Command == "" {
- manifest := config.Manifest{
- Path: step.Path,
- Waits: step.Waits,
- }
- err = createManifestAndWait(c, dc, manifest, waitTimeout)
- if err != nil {
- return err
- }
- } else if step.Command != "" && step.Path == "" {
- command := config.Run{
- Command: step.Command,
- Waits: step.Waits,
- }
-
- err := RunCommandsAndWait(command, waitTimeout)
- if err != nil {
- return err
- }
- } else {
- return fmt.Errorf("step parameter error, one Path or one Command should be specified, but got %+v", step)
- }
-
- waitTimeout = NewTimeout(timeNow, waitTimeout)
- timeNow = time.Now()
-
- if waitTimeout <= 0 {
- return fmt.Errorf("kind setup timeout")
- }
- }
return nil
}
@@ -138,58 +96,6 @@
return nil
}
-// createManifestAndWait creates manifests in k8s cluster and concurrent waits according to the manifests' wait conditions.
-func createManifestAndWait(c *kubernetes.Clientset, dc dynamic.Interface, manifest config.Manifest, timeout time.Duration) error {
- waitSet := util.NewWaitSet(timeout)
-
- kubeConfigYaml, err := ioutil.ReadFile(kubeConfigPath)
- if err != nil {
- return err
- }
-
- waits := manifest.Waits
- err = createByManifest(c, dc, manifest)
- if err != nil {
- return err
- }
-
- // len() for nil slices is defined as zero
- if len(waits) == 0 {
- logger.Log.Info("no wait-for strategy is provided")
- return nil
- }
-
- for idx := range waits {
- wait := waits[idx]
- logger.Log.Infof("waiting for %+v", wait)
-
- options, err := getWaitOptions(kubeConfigYaml, &wait)
- if err != nil {
- return err
- }
-
- waitSet.WaitGroup.Add(1)
- go concurrentlyWait(&wait, options, waitSet)
- }
-
- go func() {
- waitSet.WaitGroup.Wait()
- close(waitSet.FinishChan)
- }()
-
- select {
- case <-waitSet.FinishChan:
- logger.Log.Infof("create and wait for manifest ready success")
- case err := <-waitSet.ErrChan:
- logger.Log.Errorf("failed to wait for manifest to be ready")
- return err
- case <-time.After(waitSet.Timeout):
- return fmt.Errorf("wait for manifest ready timeout after %d seconds", int(timeout.Seconds()))
- }
-
- return nil
-}
-
func getWaitOptions(kubeConfigYaml []byte, wait *config.Wait) (options *ctlwait.WaitOptions, err error) {
if strings.Contains(wait.Resource, "/") && wait.LabelSelector != "" {
return nil, fmt.Errorf("when passing resource.group/resource.name in Resource, the labelSelector can not be set at the same time")
diff --git a/internal/util/k8s.go b/internal/util/k8s.go
index 9a1f435..45aa3f8 100644
--- a/internal/util/k8s.go
+++ b/internal/util/k8s.go
@@ -44,25 +44,31 @@
"github.com/apache/skywalking-infra-e2e/internal/logger"
)
+// K8sClusterInfo created when connect to cluster
+type K8sClusterInfo struct {
+ Client *kubernetes.Clientset
+ Interface dynamic.Interface
+}
+
// ConnectToK8sCluster gets clientSet and dynamic client from k8s config file.
-func ConnectToK8sCluster(kubeConfigPath string) (c *kubernetes.Clientset, dc dynamic.Interface, err error) {
+func ConnectToK8sCluster(kubeConfigPath string) (info *K8sClusterInfo, err error) {
config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
if err != nil {
- return nil, nil, err
+ return nil, err
}
- c, err = kubernetes.NewForConfig(config)
+ c, err := kubernetes.NewForConfig(config)
if err != nil {
- return nil, nil, err
+ return nil, err
}
- dc, err = dynamic.NewForConfig(config)
+ dc, err := dynamic.NewForConfig(config)
if err != nil {
- return nil, nil, err
+ return nil, err
}
logger.Log.Info("connect to k8s cluster succeeded")
- return c, dc, nil
+ return &K8sClusterInfo{c, dc}, nil
}
// GetManifests recursively gets all yml and yaml files from manifests string.