| // Licensed to Apache Software Foundation (ASF) under one or more contributor |
| // license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright |
| // ownership. Apache Software Foundation (ASF) licenses this file to you under |
| // the Apache License, Version 2.0 (the "License"); you may |
| // not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| |
| package setup |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "os" |
| "regexp" |
| "strconv" |
| "strings" |
| |
| "github.com/docker/go-connections/nat" |
| "github.com/testcontainers/testcontainers-go/wait" |
| |
| "github.com/apache/skywalking-infra-e2e/internal/config" |
| "github.com/apache/skywalking-infra-e2e/internal/logger" |
| "github.com/apache/skywalking-infra-e2e/internal/util" |
| |
| "github.com/docker/docker/api/types" |
| "github.com/docker/docker/api/types/filters" |
| "github.com/docker/docker/client" |
| "github.com/docker/docker/pkg/stdcopy" |
| |
| "github.com/testcontainers/testcontainers-go" |
| ) |
| |
| const ( |
| // SeparatorV1 is the separator used in docker-compose v1 |
| // refer to https://github.com/docker/compose/blob/5becea4ca9f68875334c92f191a13482bcd6e5cf/compose/service.py#L1492-L1498 |
| SeparatorV1 = "_" |
| // SeparatorV2 is the separator used in docker-compose v2 |
| // refer to https://github.com/docker/compose/blob/981aea674d052ee1ab252f71c3ca1f9f8a7e32de/pkg/compose/convergence.go#L252-L257 |
| SeparatorV2 = "-" |
| ) |
| |
| var ( |
| containerNamePattern = regexp.MustCompile(`.*_(?P<containerNum>\d+)$`) |
| ) |
| |
| // ComposeSetup sets up environment according to e2e.yaml. |
| func ComposeSetup(e2eConfig *config.E2EConfig) error { |
| composeConfigPath := e2eConfig.Setup.GetFile() |
| if composeConfigPath == "" { |
| return fmt.Errorf("no compose config file was provided") |
| } |
| |
| // build docker client |
| cli, err := client.NewClientWithOpts(client.FromEnv) |
| if err != nil { |
| return err |
| } |
| |
| // setup docker compose |
| composeFilePaths := []string{ |
| composeConfigPath, |
| } |
| identifier := GetIdentity() |
| compose := testcontainers.NewLocalDockerCompose(composeFilePaths, identifier) |
| |
| // bind wait port |
| services, err := buildComposeServices(e2eConfig, compose) |
| if err != nil { |
| return fmt.Errorf("bind wait ports error: %v", err) |
| } |
| |
| // build command |
| cmd := make([]string, 0) |
| if e2eConfig.Setup.InitSystemEnvironment != "" { |
| profilePath := util.ResolveAbs(e2eConfig.Setup.InitSystemEnvironment) |
| cmd = append(cmd, "--env-file", profilePath) |
| util.ExportEnvVars(profilePath) |
| } |
| cmd = append(cmd, "up", "-d") |
| |
| // Listen container create |
| listener := NewComposeContainerListener(context.Background(), cli, services) |
| defer listener.Stop() |
| err = listener.Listen(func(container *ComposeContainer) { |
| if err = exposeComposeLog(cli, container.Service, container.ID, logFollower); err == nil { |
| container.Service.beenFollowLog = true |
| } |
| }) |
| if err != nil { |
| return err |
| } |
| |
| // setup |
| execError := compose.WithCommand(cmd).Invoke() |
| if execError.Error != nil { |
| return execError.Error |
| } |
| |
| // find exported port and build env |
| err = exposeComposeService(services, cli, identifier, e2eConfig) |
| if err != nil { |
| return err |
| } |
| |
| // run steps |
| err = RunStepsAndWait(e2eConfig.Setup.Steps, e2eConfig.Setup.GetTimeout(), nil) |
| if err != nil { |
| logger.Log.Errorf("execute steps error: %v", err) |
| return err |
| } |
| |
| return nil |
| } |
| |
| type ComposeService struct { |
| Name string |
| waitStrategies []*hostPortCachedStrategy |
| beenFollowLog bool |
| } |
| |
| func exposeComposeService(services []*ComposeService, cli *client.Client, |
| identity string, e2eConfig *config.E2EConfig) error { |
| dockerProvider := &DockerProvider{client: cli} |
| |
| // find exported port and build env |
| for _, service := range services { |
| // expose port |
| if err := exposeComposePort(dockerProvider, service, cli, identity, e2eConfig); err != nil { |
| return err |
| } |
| |
| // if service log not follow, expose log |
| if !service.beenFollowLog { |
| c, err := service.FindContainer(cli, identity) |
| if err != nil { |
| logger.Log.Warn(err) |
| continue |
| } |
| if err := exposeComposeLog(dockerProvider.client, service, c.ID, logFollower); err != nil { |
| return err |
| } |
| service.beenFollowLog = true |
| } |
| } |
| return nil |
| } |
| |
| func (c *ComposeService) FindContainer(cli *client.Client, identity string) (*types.Container, error) { |
| serviceName, num := getInstanceName(c.Name) |
| return findContainer(cli, identity, serviceName, num) |
| } |
| |
| func exposeComposePort(dockerProvider *DockerProvider, service *ComposeService, cli *client.Client, identity string, |
| e2eConfig *config.E2EConfig) error { |
| if len(service.waitStrategies) == 0 { |
| return nil |
| } |
| |
| // get real ip address for access and export to env |
| host, err := dockerProvider.daemonHost(context.Background()) |
| if err != nil { |
| return err |
| } |
| |
| container, err := service.FindContainer(cli, identity) |
| if err != nil { |
| return err |
| } |
| |
| // format: <service_name>_host |
| if err := exportComposeEnv(fmt.Sprintf("%s_host", service.Name), host, service.Name); err != nil { |
| return err |
| } |
| |
| for inx := range service.waitStrategies { |
| for _, containerPort := range container.Ports { |
| if int(containerPort.PrivatePort) != service.waitStrategies[inx].expectPort { |
| continue |
| } |
| |
| if err := waitPortUntilReady(e2eConfig, container, dockerProvider, service.waitStrategies[inx].expectPort); err != nil { |
| return err |
| } |
| |
| // expose env config to env |
| // format: <service_name>_<port> |
| if err := exportComposeEnv( |
| fmt.Sprintf("%s_%d", service.Name, containerPort.PrivatePort), |
| fmt.Sprintf("%d", containerPort.PublicPort), |
| service.Name); err != nil { |
| return err |
| } |
| break |
| } |
| } |
| |
| return nil |
| } |
| |
| // export container log to local path |
| func exposeComposeLog(cli *client.Client, service *ComposeService, containerID string, logFollower *util.ResourceLogFollower) error { |
| logs, err := cli.ContainerLogs(logFollower.Ctx, containerID, types.ContainerLogsOptions{ |
| ShowStdout: true, |
| ShowStderr: true, |
| Follow: true, |
| Details: false, |
| }) |
| if err != nil { |
| return err |
| } |
| writer, err := logFollower.BuildLogWriter(fmt.Sprintf("%s/std.log", service.Name)) |
| if err != nil { |
| return err |
| } |
| |
| go func() { |
| defer writer.Close() |
| if _, err := stdcopy.StdCopy(writer, writer, logs); err != nil && !errors.Is(err, context.Canceled) { |
| logger.Log.Warnf("write %s std log error: %v", service.Name, err) |
| } |
| }() |
| return nil |
| } |
| |
| func exportComposeEnv(key, value, service string) error { |
| err := os.Setenv(key, value) |
| if err != nil { |
| return fmt.Errorf("could not set env for %s, %v", service, err) |
| } |
| logger.Log.Infof("export %s=%s", key, value) |
| return nil |
| } |
| |
| func buildComposeServices(e2eConfig *config.E2EConfig, compose *testcontainers.LocalDockerCompose) ([]*ComposeService, error) { |
| waitTimeout := e2eConfig.Setup.GetTimeout() |
| services := make([]*ComposeService, 0) |
| for service, content := range compose.Services { |
| serviceConfig := content.(map[any]any) |
| ports := serviceConfig["ports"] |
| serviceContext := &ComposeService{Name: service} |
| services = append(services, serviceContext) |
| if ports == nil { |
| continue |
| } |
| |
| portList := ports.([]any) |
| for inx := range portList { |
| exportPort, err := getExpectPort(portList[inx]) |
| if err != nil { |
| return nil, err |
| } |
| |
| strategy := &hostPortCachedStrategy{ |
| expectPort: exportPort, |
| HostPortStrategy: *wait.NewHostPortStrategy(nat.Port(fmt.Sprintf("%d/tcp", exportPort))).WithStartupTimeout(waitTimeout), |
| } |
| // temporary don't use testcontainers-go framework wait strategy until fix docker-in-docker bug |
| // compose.WithExposedService(service, exportPort, strategy) |
| serviceContext.waitStrategies = append(serviceContext.waitStrategies, strategy) |
| } |
| } |
| return services, nil |
| } |
| |
| func getExpectPort(portConfig any) (int, error) { |
| switch conf := portConfig.(type) { |
| case int: |
| return conf, nil |
| case string: |
| portInfo := strings.Split(conf, ":") |
| if len(portInfo) > 1 { |
| return strconv.Atoi(portInfo[1]) |
| } |
| return strconv.Atoi(portInfo[0]) |
| } |
| return 0, fmt.Errorf("unknown port information: %v", portConfig) |
| } |
| |
| func findContainer(c *client.Client, projectName, serviceName string, number int) (*types.Container, error) { |
| nameV1 := strings.Join([]string{projectName, serviceName, strconv.Itoa(number)}, SeparatorV1) |
| nameV2 := strings.Join([]string{projectName, serviceName, strconv.Itoa(number)}, SeparatorV2) |
| // filter either names |
| // 1) {project}_{service}_{number} |
| // 2) {project}-{service}-{number} |
| f := filters.NewArgs(filters.Arg("name", nameV1), filters.Arg("name", nameV2)) |
| containerListOptions := types.ContainerListOptions{Filters: f} |
| containers, err := c.ContainerList(context.Background(), containerListOptions) |
| if err != nil { |
| return nil, err |
| } |
| |
| if len(containers) == 0 { |
| return nil, fmt.Errorf("could not found container: %s(docker-compose v1) or %s(docker-compose v2)", nameV1, nameV2) |
| } |
| return &containers[0], nil |
| } |
| |
| func getInstanceName(serviceName string) (service string, number int) { |
| matches := containerNamePattern.FindStringSubmatch(serviceName) |
| if len(matches) == 0 { |
| return serviceName, 1 |
| } |
| numberStr := matches[0] |
| number, err := strconv.Atoi(numberStr) |
| if err != nil { |
| return serviceName, 1 |
| } |
| return serviceName, number |
| } |
| |
| // hostPortCachedStrategy cached original target |
| type hostPortCachedStrategy struct { |
| wait.HostPortStrategy |
| expectPort int |
| target wait.StrategyTarget |
| } |
| |
| func (hp *hostPortCachedStrategy) WaitUntilReady(ctx context.Context, target wait.StrategyTarget) error { |
| hp.target = target |
| return hp.HostPortStrategy.WaitUntilReady(ctx, target) |
| } |
| |
| func waitPortUntilReady(e2eConfig *config.E2EConfig, container *types.Container, dockerProvider *DockerProvider, expectPort int) error { |
| // wait port |
| waitTimeout := e2eConfig.Setup.GetTimeout() |
| waitPort := nat.Port(fmt.Sprintf("%d/tcp", expectPort)) |
| target := &DockerContainer{ |
| ID: container.ID, |
| WaitingFor: wait.NewHostPortStrategy(waitPort), |
| provider: dockerProvider} |
| return WaitPort(context.Background(), target, waitPort, waitTimeout) |
| } |