Let `compose` support `setup.steps` (#26)

diff --git a/examples/compose/e2e.yaml b/examples/compose/e2e.yaml
index 459a586..6b16685 100644
--- a/examples/compose/e2e.yaml
+++ b/examples/compose/e2e.yaml
@@ -19,6 +19,18 @@
   env: compose
   file: docker-compose.yml
   timeout: 1200
+  steps:
+    - name: install yq
+      command: |
+        if ! type "yq" > /dev/null; then
+          mkdir -p /tmp/skywalking-infra-e2e/bin && cd /tmp/skywalking-infra-e2e
+          mkdir yq && cd yq
+          wget -O yq.tar.gz https://github.com/mikefarah/yq/archive/v4.11.1.tar.gz
+          tar -zxf yq.tar.gz --strip=1
+          go install && go build -ldflags -s && mv yq ../bin/yq
+          export PATH="$PATH:/tmp/skywalking-infra-e2e/bin"
+          echo "success to install yq"
+        fi
 
 cleanup:
   # always never success failure
diff --git a/internal/components/setup/common.go b/internal/components/setup/common.go
index 7d1213a..1e93b9d 100644
--- a/internal/components/setup/common.go
+++ b/internal/components/setup/common.go
@@ -24,11 +24,118 @@
 	"os"
 	"time"
 
+	"k8s.io/client-go/dynamic"
+	"k8s.io/client-go/kubernetes"
+
 	"github.com/apache/skywalking-infra-e2e/internal/config"
+	"github.com/apache/skywalking-infra-e2e/internal/constant"
 	"github.com/apache/skywalking-infra-e2e/internal/logger"
 	"github.com/apache/skywalking-infra-e2e/internal/util"
 )
 
+func RunStepsAndWait(steps []config.Step, timeout int, k8sCluster *util.K8sClusterInfo) error {
+	var waitTimeout time.Duration
+	if timeout <= 0 {
+		waitTimeout = constant.DefaultWaitTimeout
+	} else {
+		waitTimeout = time.Duration(timeout) * time.Second
+	}
+	logger.Log.Debugf("wait timeout is %d seconds", int(waitTimeout.Seconds()))
+
+	// record time now
+	timeNow := time.Now()
+
+	for _, step := range steps {
+		logger.Log.Infof("processing setup step [%s]", step.Name)
+
+		if step.Path != "" && step.Command == "" {
+			if k8sCluster == nil {
+				return fmt.Errorf("not support path")
+			}
+			manifest := config.Manifest{
+				Path:  step.Path,
+				Waits: step.Waits,
+			}
+			err := createManifestAndWait(k8sCluster.Client, k8sCluster.Interface, manifest, waitTimeout)
+			if err != nil {
+				return err
+			}
+		} else if step.Command != "" && step.Path == "" {
+			command := config.Run{
+				Command: step.Command,
+				Waits:   step.Waits,
+			}
+
+			err := RunCommandsAndWait(command, waitTimeout)
+			if err != nil {
+				return err
+			}
+		} else {
+			return fmt.Errorf("step parameter error, one Path or one Command should be specified, but got %+v", step)
+		}
+
+		waitTimeout = NewTimeout(timeNow, waitTimeout)
+		timeNow = time.Now()
+
+		if waitTimeout <= 0 {
+			return fmt.Errorf("setup timeout")
+		}
+	}
+	return nil
+}
+
+// createManifestAndWait creates manifests in k8s cluster and concurrent waits according to the manifests' wait conditions.
+func createManifestAndWait(c *kubernetes.Clientset, dc dynamic.Interface, manifest config.Manifest, timeout time.Duration) error {
+	waitSet := util.NewWaitSet(timeout)
+
+	kubeConfigYaml, err := ioutil.ReadFile(kubeConfigPath)
+	if err != nil {
+		return err
+	}
+
+	waits := manifest.Waits
+	err = createByManifest(c, dc, manifest)
+	if err != nil {
+		return err
+	}
+
+	// len() for nil slices is defined as zero
+	if len(waits) == 0 {
+		logger.Log.Info("no wait-for strategy is provided")
+		return nil
+	}
+
+	for idx := range waits {
+		wait := waits[idx]
+		logger.Log.Infof("waiting for %+v", wait)
+
+		options, err := getWaitOptions(kubeConfigYaml, &wait)
+		if err != nil {
+			return err
+		}
+
+		waitSet.WaitGroup.Add(1)
+		go concurrentlyWait(&wait, options, waitSet)
+	}
+
+	go func() {
+		waitSet.WaitGroup.Wait()
+		close(waitSet.FinishChan)
+	}()
+
+	select {
+	case <-waitSet.FinishChan:
+		logger.Log.Infof("create and wait for manifest ready success")
+	case err := <-waitSet.ErrChan:
+		logger.Log.Errorf("failed to wait for manifest to be ready")
+		return err
+	case <-time.After(waitSet.Timeout):
+		return fmt.Errorf("wait for manifest ready timeout after %d seconds", int(timeout.Seconds()))
+	}
+
+	return nil
+}
+
 // RunCommandsAndWait Concurrently run commands and wait for conditions.
 func RunCommandsAndWait(run config.Run, timeout time.Duration) error {
 	waitSet := util.NewWaitSet(timeout)
diff --git a/internal/components/setup/compose.go b/internal/components/setup/compose.go
index 07ec57c..9ba463c 100644
--- a/internal/components/setup/compose.go
+++ b/internal/components/setup/compose.go
@@ -62,6 +62,55 @@
 	compose := testcontainers.NewLocalDockerCompose(composeFilePaths, identifier)
 
 	// bind wait port
+	serviceWithPorts, err := bindWaitPort(e2eConfig, compose)
+	if err != nil {
+		return fmt.Errorf("bind wait ports error: %v", err)
+	}
+
+	execError := compose.WithCommand([]string{"up", "-d"}).Invoke()
+	if execError.Error != nil {
+		return execError.Error
+	}
+
+	// find exported port and build env
+	for service, portList := range serviceWithPorts {
+		container, err2 := findContainer(cli, fmt.Sprintf("%s_%s", identifier, getInstanceName(service)))
+		if err2 != nil {
+			return err2
+		}
+		containerPorts := container.Ports
+
+		for inx := range portList {
+			for _, containerPort := range containerPorts {
+				if int(containerPort.PrivatePort) != portList[inx] {
+					continue
+				}
+
+				// expose env config to env
+				// format: <service_name>_<port>
+				envKey := fmt.Sprintf("%s_%d", service, containerPort.PrivatePort)
+				envValue := fmt.Sprintf("%d", containerPort.PublicPort)
+				err2 = os.Setenv(envKey, envValue)
+				if err2 != nil {
+					return fmt.Errorf("could not set env for %s:%d, %v", service, portList[inx], err2)
+				}
+				logger.Log.Infof("expose env : %s : %s", envKey, envValue)
+				break
+			}
+		}
+	}
+
+	// run steps
+	err = RunStepsAndWait(e2eConfig.Setup.Steps, e2eConfig.Setup.Timeout, nil)
+	if err != nil {
+		logger.Log.Errorf("execute steps error: %v", err)
+		return err
+	}
+
+	return nil
+}
+
+func bindWaitPort(e2eConfig *config.E2EConfig, compose *testcontainers.LocalDockerCompose) (map[string][]int, error) {
 	timeout := e2eConfig.Setup.Timeout
 	var waitTimeout time.Duration
 	if timeout <= 0 {
@@ -82,7 +131,7 @@
 		for inx := range portList {
 			exportPort, err := getExpectPort(portList[inx])
 			if err != nil {
-				return err
+				return nil, err
 			}
 			serviceWithPorts[service] = append(serviceWithPorts[service], exportPort)
 
@@ -92,41 +141,7 @@
 				wait.NewHostPortStrategy(nat.Port(fmt.Sprintf("%d/tcp", exportPort))).WithStartupTimeout(waitTimeout))
 		}
 	}
-
-	execError := compose.WithCommand([]string{"up", "-d"}).Invoke()
-	if execError.Error != nil {
-		return execError.Error
-	}
-
-	// find exported port and build env
-	for service, portList := range serviceWithPorts {
-		container, err := findContainer(cli, fmt.Sprintf("%s_%s", identifier, getInstanceName(service)))
-		if err != nil {
-			return err
-		}
-		containerPorts := container.Ports
-
-		for inx := range portList {
-			for _, containerPort := range containerPorts {
-				if int(containerPort.PrivatePort) != portList[inx] {
-					continue
-				}
-
-				// expose env config to env
-				// format: <service_name>_<port>
-				envKey := fmt.Sprintf("%s_%d", service, containerPort.PrivatePort)
-				envValue := fmt.Sprintf("%d", containerPort.PublicPort)
-				err = os.Setenv(envKey, envValue)
-				if err != nil {
-					return fmt.Errorf("could not set env for %s:%d, %v", service, portList[inx], err)
-				}
-				logger.Log.Infof("expose env : %s : %s", envKey, envValue)
-				break
-			}
-		}
-	}
-
-	return nil
+	return serviceWithPorts, nil
 }
 
 func getExpectPort(portConfig interface{}) (int, error) {
diff --git a/internal/components/setup/kind.go b/internal/components/setup/kind.go
index 1c8e0f1..e50ccd9 100644
--- a/internal/components/setup/kind.go
+++ b/internal/components/setup/kind.go
@@ -20,16 +20,13 @@
 
 import (
 	"fmt"
-	"io/ioutil"
 	"os"
 	"strings"
-	"time"
 
 	"k8s.io/cli-runtime/pkg/genericclioptions"
-	ctlwait "k8s.io/kubectl/pkg/cmd/wait"
-
 	"k8s.io/client-go/dynamic"
 	"k8s.io/client-go/kubernetes"
+	ctlwait "k8s.io/kubectl/pkg/cmd/wait"
 
 	"github.com/apache/skywalking-infra-e2e/internal/config"
 
@@ -69,58 +66,19 @@
 		return err
 	}
 
-	c, dc, err := util.ConnectToK8sCluster(kubeConfigPath)
+	cluster, err := util.ConnectToK8sCluster(kubeConfigPath)
 	if err != nil {
 		logger.Log.Errorf("connect to k8s cluster failed according to config file: %s", kubeConfigPath)
 		return err
 	}
 
-	// run commands and manifests
-	timeout := e2eConfig.Setup.Timeout
-	var waitTimeout time.Duration
-	if timeout <= 0 {
-		waitTimeout = constant.DefaultWaitTimeout
-	} else {
-		waitTimeout = time.Duration(timeout) * time.Second
+	// run steps
+	err = RunStepsAndWait(e2eConfig.Setup.Steps, e2eConfig.Setup.Timeout, cluster)
+	if err != nil {
+		logger.Log.Errorf("execute steps error: %v", err)
+		return err
 	}
-	logger.Log.Debugf("wait timeout is %d seconds", int(waitTimeout.Seconds()))
 
-	// record time now
-	timeNow := time.Now()
-
-	for _, step := range e2eConfig.Setup.Steps {
-		logger.Log.Infof("processing setup step [%s]", step.Name)
-
-		if step.Path != "" && step.Command == "" {
-			manifest := config.Manifest{
-				Path:  step.Path,
-				Waits: step.Waits,
-			}
-			err = createManifestAndWait(c, dc, manifest, waitTimeout)
-			if err != nil {
-				return err
-			}
-		} else if step.Command != "" && step.Path == "" {
-			command := config.Run{
-				Command: step.Command,
-				Waits:   step.Waits,
-			}
-
-			err := RunCommandsAndWait(command, waitTimeout)
-			if err != nil {
-				return err
-			}
-		} else {
-			return fmt.Errorf("step parameter error, one Path or one Command should be specified, but got %+v", step)
-		}
-
-		waitTimeout = NewTimeout(timeNow, waitTimeout)
-		timeNow = time.Now()
-
-		if waitTimeout <= 0 {
-			return fmt.Errorf("kind setup timeout")
-		}
-	}
 	return nil
 }
 
@@ -138,58 +96,6 @@
 	return nil
 }
 
-// createManifestAndWait creates manifests in k8s cluster and concurrent waits according to the manifests' wait conditions.
-func createManifestAndWait(c *kubernetes.Clientset, dc dynamic.Interface, manifest config.Manifest, timeout time.Duration) error {
-	waitSet := util.NewWaitSet(timeout)
-
-	kubeConfigYaml, err := ioutil.ReadFile(kubeConfigPath)
-	if err != nil {
-		return err
-	}
-
-	waits := manifest.Waits
-	err = createByManifest(c, dc, manifest)
-	if err != nil {
-		return err
-	}
-
-	// len() for nil slices is defined as zero
-	if len(waits) == 0 {
-		logger.Log.Info("no wait-for strategy is provided")
-		return nil
-	}
-
-	for idx := range waits {
-		wait := waits[idx]
-		logger.Log.Infof("waiting for %+v", wait)
-
-		options, err := getWaitOptions(kubeConfigYaml, &wait)
-		if err != nil {
-			return err
-		}
-
-		waitSet.WaitGroup.Add(1)
-		go concurrentlyWait(&wait, options, waitSet)
-	}
-
-	go func() {
-		waitSet.WaitGroup.Wait()
-		close(waitSet.FinishChan)
-	}()
-
-	select {
-	case <-waitSet.FinishChan:
-		logger.Log.Infof("create and wait for manifest ready success")
-	case err := <-waitSet.ErrChan:
-		logger.Log.Errorf("failed to wait for manifest to be ready")
-		return err
-	case <-time.After(waitSet.Timeout):
-		return fmt.Errorf("wait for manifest ready timeout after %d seconds", int(timeout.Seconds()))
-	}
-
-	return nil
-}
-
 func getWaitOptions(kubeConfigYaml []byte, wait *config.Wait) (options *ctlwait.WaitOptions, err error) {
 	if strings.Contains(wait.Resource, "/") && wait.LabelSelector != "" {
 		return nil, fmt.Errorf("when passing resource.group/resource.name in Resource, the labelSelector can not be set at the same time")
diff --git a/internal/util/k8s.go b/internal/util/k8s.go
index 9a1f435..45aa3f8 100644
--- a/internal/util/k8s.go
+++ b/internal/util/k8s.go
@@ -44,25 +44,31 @@
 	"github.com/apache/skywalking-infra-e2e/internal/logger"
 )
 
+// K8sClusterInfo created when connect to cluster
+type K8sClusterInfo struct {
+	Client    *kubernetes.Clientset
+	Interface dynamic.Interface
+}
+
 // ConnectToK8sCluster gets clientSet and dynamic client from k8s config file.
-func ConnectToK8sCluster(kubeConfigPath string) (c *kubernetes.Clientset, dc dynamic.Interface, err error) {
+func ConnectToK8sCluster(kubeConfigPath string) (info *K8sClusterInfo, err error) {
 	config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
 	if err != nil {
-		return nil, nil, err
+		return nil, err
 	}
-	c, err = kubernetes.NewForConfig(config)
+	c, err := kubernetes.NewForConfig(config)
 	if err != nil {
-		return nil, nil, err
+		return nil, err
 	}
 
-	dc, err = dynamic.NewForConfig(config)
+	dc, err := dynamic.NewForConfig(config)
 	if err != nil {
-		return nil, nil, err
+		return nil, err
 	}
 
 	logger.Log.Info("connect to k8s cluster succeeded")
 
-	return c, dc, nil
+	return &K8sClusterInfo{c, dc}, nil
 }
 
 // GetManifests recursively gets all yml and yaml files from manifests string.