| // Licensed to Apache Software Foundation (ASF) under one or more contributor |
| // license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright |
| // ownership. Apache Software Foundation (ASF) licenses this file to you under |
| // the Apache License, Version 2.0 (the "License"); you may |
| // not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| |
| package setup |
| |
| import ( |
| "context" |
| "fmt" |
| "os" |
| "strings" |
| "time" |
| |
| "github.com/apache/skywalking-infra-e2e/internal/config" |
| "github.com/apache/skywalking-infra-e2e/internal/logger" |
| "github.com/apache/skywalking-infra-e2e/internal/util" |
| ) |
| |
| var ( |
| logFollower *util.ResourceLogFollower |
| ) |
| |
| func RunStepsAndWait(steps []config.Step, waitTimeout time.Duration, k8sCluster *util.K8sClusterInfo) error { |
| logger.Log.Debugf("wait timeout is %v", waitTimeout.String()) |
| |
| // record time now |
| timeNow := time.Now() |
| |
| for _, step := range steps { |
| logger.Log.Infof("processing setup step [%s]", step.Name) |
| |
| if step.Path != "" && step.Command == "" { |
| if k8sCluster == nil { |
| return fmt.Errorf("not support path") |
| } |
| manifest := config.Manifest{ |
| Path: step.Path, |
| Waits: step.Waits, |
| } |
| err := createManifestAndWait(k8sCluster, manifest, waitTimeout) |
| if err != nil { |
| return err |
| } |
| } else if step.Command != "" && step.Path == "" { |
| command := config.Run{ |
| Command: step.Command, |
| Waits: step.Waits, |
| } |
| |
| err := RunCommandsAndWait(command, waitTimeout, k8sCluster) |
| if err != nil { |
| return err |
| } |
| } else { |
| return fmt.Errorf("step parameter error, one Path or one Command should be specified, but got %+v", step) |
| } |
| |
| waitTimeout = NewTimeout(timeNow, waitTimeout) |
| timeNow = time.Now() |
| |
| if waitTimeout <= 0 { |
| return fmt.Errorf("setup timeout") |
| } |
| } |
| return nil |
| } |
| |
| // createManifestAndWait creates manifests in k8s cluster and concurrent waits according to the manifests' wait conditions. |
| func createManifestAndWait(c *util.K8sClusterInfo, manifest config.Manifest, timeout time.Duration) error { |
| waitSet := util.NewWaitSet(timeout) |
| |
| waits := manifest.Waits |
| err := createByManifest(c, manifest) |
| if err != nil { |
| return err |
| } |
| |
| // len() for nil slices is defined as zero |
| if len(waits) == 0 { |
| logger.Log.Info("no wait-for strategy is provided") |
| return nil |
| } |
| |
| for idx := range waits { |
| wait := waits[idx] |
| logger.Log.Infof("waiting for %+v", wait) |
| |
| options, err := getWaitOptions(c, &wait) |
| if err != nil { |
| return err |
| } |
| |
| waitSet.WaitGroup.Add(1) |
| go concurrentlyWait(&wait, options, waitSet) |
| } |
| |
| go func() { |
| waitSet.WaitGroup.Wait() |
| close(waitSet.FinishChan) |
| }() |
| |
| select { |
| case <-waitSet.FinishChan: |
| logger.Log.Infof("create and wait for manifest ready success") |
| case err := <-waitSet.ErrChan: |
| logger.Log.Errorf("failed to wait for manifest to be ready") |
| return err |
| case <-time.After(waitSet.Timeout): |
| return fmt.Errorf("wait for manifest ready timeout after %d seconds", int(timeout.Seconds())) |
| } |
| |
| return nil |
| } |
| |
| // RunCommandsAndWait Concurrently run commands and wait for conditions. |
| func RunCommandsAndWait(run config.Run, timeout time.Duration, cluster *util.K8sClusterInfo) error { |
| waitSet := util.NewWaitSet(timeout) |
| |
| commands := run.Command |
| if len(commands) < 1 { |
| return nil |
| } |
| |
| waitSet.WaitGroup.Add(1) |
| go executeCommandsAndWait(commands, run.Waits, waitSet, cluster) |
| |
| go func() { |
| waitSet.WaitGroup.Wait() |
| close(waitSet.FinishChan) |
| }() |
| |
| select { |
| case <-waitSet.FinishChan: |
| logger.Log.Infof("all commands executed successfully") |
| case err := <-waitSet.ErrChan: |
| logger.Log.Errorf("execute command error") |
| return err |
| case <-time.After(waitSet.Timeout): |
| return fmt.Errorf("wait for commands run timeout after %d seconds", int(timeout.Seconds())) |
| } |
| |
| return nil |
| } |
| |
| func executeCommandsAndWait(commands string, waits []config.Wait, waitSet *util.WaitSet, cluster *util.K8sClusterInfo) { |
| defer waitSet.WaitGroup.Done() |
| |
| // executes commands |
| logger.Log.Infof("executing commands [%s]", strings.ReplaceAll(commands, "\n", "\\n")) |
| result, stderr, err := util.ExecuteCommand(commands) |
| if err != nil { |
| err = fmt.Errorf("commands: [%s] runs error: %s", strings.ReplaceAll(commands, "\n", "\\n"), stderr) |
| waitSet.ErrChan <- err |
| } |
| logger.Log.Infof("executed commands [%s], result: %s", strings.ReplaceAll(commands, "\n", "\\n"), result) |
| |
| // waits for conditions meet |
| for idx := range waits { |
| wait := waits[idx] |
| logger.Log.Infof("waiting for %+v", wait) |
| |
| options, err := getWaitOptions(cluster, &wait) |
| if err != nil { |
| err = fmt.Errorf("commands: [%s] get wait options error: %s", commands, err) |
| waitSet.ErrChan <- err |
| } |
| |
| err = options.RunWait() |
| if err != nil { |
| err = fmt.Errorf("commands: [%s] waits error: %s", commands, err) |
| waitSet.ErrChan <- err |
| return |
| } |
| logger.Log.Infof("wait %+v condition met", wait) |
| } |
| } |
| |
| // NewTimeout calculates new timeout since timeBefore. |
| func NewTimeout(timeBefore time.Time, timeout time.Duration) time.Duration { |
| elapsed := time.Since(timeBefore) |
| newTimeout := timeout - elapsed |
| return newTimeout |
| } |
| |
| func GetIdentity() string { |
| runID := os.Getenv("GITHUB_RUN_ID") |
| if runID == "" { |
| return "skywalking_e2e" |
| } |
| return runID |
| } |
| |
| func InitLogFollower() { |
| logFollower = util.NewResourceLogFollower(context.Background(), util.LogDir) |
| } |
| |
| func CloseLogFollower() { |
| if logFollower != nil { |
| logFollower.Close() |
| } |
| } |