blob: f74578aed5714b7cb12ef170f5c538ff1ad43e34 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package environment
import (
"fmt"
"io/fs"
"os"
"path/filepath"
"reflect"
"testing"
"time"
pb "beam.apache.org/playground/backend/internal/api/v1"
)
const (
javaConfig = "{\n \"compile_cmd\": \"javac\",\n \"run_cmd\": \"java\",\n \"test_cmd\": \"java\",\n \"compile_args\": [\n \"-d\",\n \"bin\",\n \"-parameters\",\n \"-classpath\"\n ],\n \"run_args\": [\n \"-cp\",\n \"bin:\"\n ],\n \"test_args\": [\n \"-cp\",\n \"bin:\",\n \"org.junit.runner.JUnitCore\"\n ]\n}"
goConfig = "{\n \"compile_cmd\": \"go\",\n \"run_cmd\": \"\",\n \"test_cmd\": \"go\",\n \"compile_args\": [\n \"build\",\n \"-o\",\n \"bin\"\n ],\n \"run_args\": [\n ],\n \"test_args\": [\n \"test\",\n \"-v\"\n ]\n}\n"
pythonConfig = "{\n \"compile_cmd\": \"\",\n \"run_cmd\": \"python3\",\n \"test_cmd\": \"pytest\",\n \"compile_args\": [],\n \"run_args\": [],\n \"test_args\": []\n}\n"
scioConfig = "{\n \"compile_cmd\": \"\",\n \"run_cmd\": \"sbt\",\n \"test_cmd\": \"sbt\",\n \"compile_args\": [],\n \"run_args\": [\n \"runMain\"\n ],\n \"test_args\": []\n}\n"
defaultProjectId = ""
dirPermission = 0600
)
var executorConfig *ExecutorConfig
func TestMain(m *testing.M) {
err := setup()
if err != nil {
panic(fmt.Errorf("error during test setup: %s", err.Error()))
}
defer teardown()
m.Run()
}
func setup() error {
err := os.MkdirAll(configFolderName, fs.ModePerm)
if err != nil {
return err
}
javaConfigPath := filepath.Join(configFolderName, pb.Sdk_SDK_JAVA.String()+jsonExt)
err = os.WriteFile(javaConfigPath, []byte(javaConfig), dirPermission)
goConfigPath := filepath.Join(configFolderName, pb.Sdk_SDK_GO.String()+jsonExt)
err = os.WriteFile(goConfigPath, []byte(goConfig), dirPermission)
pythonConfigPath := filepath.Join(configFolderName, pb.Sdk_SDK_PYTHON.String()+jsonExt)
err = os.WriteFile(pythonConfigPath, []byte(pythonConfig), dirPermission)
scioConfigPath := filepath.Join(configFolderName, pb.Sdk_SDK_SCIO.String()+jsonExt)
err = os.WriteFile(scioConfigPath, []byte(scioConfig), dirPermission)
if err != nil {
return err
}
os.Clearenv()
jars, err := ConcatBeamJarsToString()
if err != nil {
return err
}
executorConfig = NewExecutorConfig(
"javac", "java", "java",
[]string{"-d", "bin", "-parameters", "-classpath", jars},
[]string{"-cp", "bin:" + jars},
[]string{"-cp", "bin:" + jars, "org.junit.runner.JUnitCore"},
)
return nil
}
func teardown() {
err := os.RemoveAll(configFolderName)
if err != nil {
panic(fmt.Errorf("error during test setup: %s", err.Error()))
}
}
func setOsEnvs(envsToSet map[string]string) error {
for key, value := range envsToSet {
if err := os.Setenv(key, value); err != nil {
return err
}
}
return nil
}
func TestNewEnvironment(t *testing.T) {
executorConfig := NewExecutorConfig("javac", "java", "java", []string{""}, []string{""}, []string{""})
preparedModDir := ""
tests := []struct {
name string
want *Environment
}{
{name: "Create env service with default envs", want: &Environment{
NetworkEnvs: *NewNetworkEnvs(defaultIp, defaultPort, defaultProtocol),
BeamSdkEnvs: *NewBeamEnvs(defaultSdk, defaultBeamVersion, executorConfig, preparedModDir, 0),
ApplicationEnvs: *NewApplicationEnvs(
"/app",
defaultLaunchSite,
defaultProjectId,
defaultPipelinesFolder,
defaultSDKConfigPath,
defaultPropertyPath,
defaultKafkaEmulatorExecutablePath,
defaultDatasetsPath,
defaultCleanupSnippetsFunctionsUrl,
defaultPutSnippetFunctionsUrl,
defaultIncrementSnippetViewsFunctionsUrl,
&CacheEnvs{
defaultCacheType,
defaultCacheAddress,
defaultCacheKeyExpirationTime,
},
defaultPipelineExecuteTimeout,
defaultCacheRequestTimeout,
),
}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := NewEnvironment(
*NewNetworkEnvs(defaultIp, defaultPort, defaultProtocol),
*NewBeamEnvs(defaultSdk, defaultBeamVersion, executorConfig, preparedModDir, 0),
*NewApplicationEnvs(
"/app",
defaultLaunchSite,
defaultProjectId,
defaultPipelinesFolder,
defaultSDKConfigPath,
defaultPropertyPath,
defaultKafkaEmulatorExecutablePath,
defaultDatasetsPath,
defaultCleanupSnippetsFunctionsUrl,
defaultPutSnippetFunctionsUrl,
defaultIncrementSnippetViewsFunctionsUrl,
&CacheEnvs{
defaultCacheType,
defaultCacheAddress,
defaultCacheKeyExpirationTime,
},
defaultPipelineExecuteTimeout,
defaultCacheRequestTimeout,
)); !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewEnvironment() = %v, want %v", got, tt.want)
}
})
}
}
func Test_getSdkEnvsFromOsEnvs(t *testing.T) {
workingDir := "./"
preparedModDir := ""
tests := []struct {
name string
want *BeamEnvs
envsToSet map[string]string
wantErr bool
}{
{
name: "Not specified beam sdk key in os envs",
want: NewBeamEnvs(pb.Sdk_SDK_UNSPECIFIED, defaultBeamVersion, nil, preparedModDir, defaultNumOfParallelJobs),
envsToSet: map[string]string{},
wantErr: false,
},
{
name: "Default beam envs",
want: NewBeamEnvs(pb.Sdk_SDK_UNSPECIFIED, defaultBeamVersion, nil, preparedModDir, defaultNumOfParallelJobs),
envsToSet: map[string]string{beamSdkKey: "SDK_UNSPECIFIED"},
wantErr: false,
},
{
name: "Specific sdk key in os envs",
want: NewBeamEnvs(pb.Sdk_SDK_JAVA, defaultBeamVersion, executorConfig, preparedModDir, defaultNumOfParallelJobs),
envsToSet: map[string]string{beamSdkKey: "SDK_JAVA"},
wantErr: false,
},
{
name: "Wrong sdk key in os envs",
want: NewBeamEnvs(pb.Sdk_SDK_UNSPECIFIED, defaultBeamVersion, nil, preparedModDir, defaultNumOfParallelJobs),
envsToSet: map[string]string{beamSdkKey: "SDK_J"},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := setOsEnvs(tt.envsToSet); err != nil {
t.Fatalf("couldn't setup os env")
}
got, err := ConfigureBeamEnvs(workingDir)
if (err != nil) != tt.wantErr {
t.Errorf("ConfigureBeamEnvs() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ConfigureBeamEnvs() got = %v, want %v", got, tt.want)
}
})
}
os.Clearenv()
}
func Test_getNetworkEnvsFromOsEnvs(t *testing.T) {
tests := []struct {
name string
want *NetworkEnvs
envsToSet map[string]string
wantErr bool
}{
{
name: "Default values",
want: NewNetworkEnvs(defaultIp, defaultPort, defaultProtocol),
},
{
name: "Values from os envs",
want: NewNetworkEnvs("12.12.12.21", 1234, "TCP"),
envsToSet: map[string]string{serverIpKey: "12.12.12.21", serverPortKey: "1234", protocolTypeKey: "TCP"},
},
{
name: "Not int port in os env, should be default",
want: nil,
envsToSet: map[string]string{serverIpKey: "12.12.12.21", serverPortKey: "1a34"},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := setOsEnvs(tt.envsToSet); err != nil {
t.Fatalf("couldn't setup os env")
}
got, err := GetNetworkEnvsFromOsEnvs()
if (err != nil) != tt.wantErr {
t.Errorf("getNetworkEnvsFromOsEnvs() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("getNetworkEnvsFromOsEnvs() got = %v, want %v", got, tt.want)
}
})
}
os.Clearenv()
}
func Test_getApplicationEnvsFromOsEnvs(t *testing.T) {
hour := "1h"
convertedTime, _ := time.ParseDuration(hour)
tests := []struct {
name string
want *ApplicationEnvs
wantErr bool
envsToSet map[string]string
}{
{
name: "Working dir is provided",
want: NewApplicationEnvs(
"/app",
defaultLaunchSite,
defaultProjectId,
defaultPipelinesFolder,
defaultSDKConfigPath,
defaultPropertyPath,
defaultKafkaEmulatorExecutablePath,
defaultDatasetsPath,
defaultCleanupSnippetsFunctionsUrl,
defaultPutSnippetFunctionsUrl,
defaultIncrementSnippetViewsFunctionsUrl,
&CacheEnvs{
defaultCacheType,
defaultCacheAddress,
defaultCacheKeyExpirationTime,
},
defaultPipelineExecuteTimeout,
defaultCacheRequestTimeout,
),
wantErr: false,
envsToSet: map[string]string{workingDirKey: "/app", launchSiteKey: defaultLaunchSite, projectIdKey: defaultProjectId},
},
{
name: "Working dir isn't provided",
want: nil,
wantErr: true,
},
{
name: "CacheKeyExpirationTimeKey is set with the correct value",
want: NewApplicationEnvs(
"/app",
defaultLaunchSite,
defaultProjectId,
defaultPipelinesFolder,
defaultSDKConfigPath,
defaultPropertyPath,
defaultKafkaEmulatorExecutablePath,
defaultDatasetsPath,
defaultCleanupSnippetsFunctionsUrl,
defaultPutSnippetFunctionsUrl,
defaultIncrementSnippetViewsFunctionsUrl,
&CacheEnvs{
defaultCacheType,
defaultCacheAddress,
convertedTime,
},
defaultPipelineExecuteTimeout,
defaultCacheRequestTimeout),
wantErr: false,
envsToSet: map[string]string{workingDirKey: "/app", cacheKeyExpirationTimeKey: hour},
},
{
name: "CacheKeyExpirationTimeKey is set with the incorrect value",
want: NewApplicationEnvs(
"/app",
defaultLaunchSite,
defaultProjectId,
defaultPipelinesFolder,
defaultSDKConfigPath,
defaultPropertyPath,
defaultKafkaEmulatorExecutablePath,
defaultDatasetsPath,
defaultCleanupSnippetsFunctionsUrl,
defaultPutSnippetFunctionsUrl,
defaultIncrementSnippetViewsFunctionsUrl,
&CacheEnvs{
defaultCacheType,
defaultCacheAddress,
defaultCacheKeyExpirationTime,
},
defaultPipelineExecuteTimeout,
defaultCacheRequestTimeout,
),
wantErr: false,
envsToSet: map[string]string{workingDirKey: "/app", cacheKeyExpirationTimeKey: "1"},
},
{
name: "CacheKeyExpirationTimeKey is set with the correct value",
want: NewApplicationEnvs(
"/app",
defaultLaunchSite,
defaultProjectId,
defaultPipelinesFolder,
defaultSDKConfigPath,
defaultPropertyPath,
defaultKafkaEmulatorExecutablePath,
defaultDatasetsPath,
defaultCleanupSnippetsFunctionsUrl,
defaultPutSnippetFunctionsUrl,
defaultIncrementSnippetViewsFunctionsUrl,
&CacheEnvs{
defaultCacheType,
defaultCacheAddress,
defaultCacheKeyExpirationTime,
},
convertedTime,
defaultCacheRequestTimeout,
),
wantErr: false,
envsToSet: map[string]string{workingDirKey: "/app", pipelineExecuteTimeoutKey: hour},
},
{
name: "PipelineExecuteTimeoutKey is set with the incorrect value",
want: NewApplicationEnvs(
"/app",
defaultLaunchSite,
defaultProjectId,
defaultPipelinesFolder,
defaultSDKConfigPath,
defaultPropertyPath,
defaultKafkaEmulatorExecutablePath,
defaultDatasetsPath,
defaultCleanupSnippetsFunctionsUrl,
defaultPutSnippetFunctionsUrl,
defaultIncrementSnippetViewsFunctionsUrl,
&CacheEnvs{
defaultCacheType,
defaultCacheAddress,
defaultCacheKeyExpirationTime,
},
defaultPipelineExecuteTimeout,
defaultCacheRequestTimeout,
),
wantErr: false,
envsToSet: map[string]string{workingDirKey: "/app", pipelineExecuteTimeoutKey: "1"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := setOsEnvs(tt.envsToSet); err != nil {
t.Fatalf("couldn't setup os env")
}
got, err := GetApplicationEnvsFromOsEnvs()
if (err != nil) != tt.wantErr {
t.Errorf("getApplicationEnvsFromOsEnvs() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("getApplicationEnvsFromOsEnvs() got = %v, want %v", got, tt.want)
}
os.Clearenv()
})
}
os.Clearenv()
}
func Test_createExecutorConfig(t *testing.T) {
type args struct {
apacheBeamSdk pb.Sdk
configPath string
}
tests := []struct {
name string
args args
want *ExecutorConfig
wantErr bool
}{
{
name: "Create executor configuration from json file",
args: args{apacheBeamSdk: pb.Sdk_SDK_JAVA, configPath: filepath.Join(configFolderName, pb.Sdk_SDK_JAVA.String()+jsonExt)},
want: executorConfig,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := createExecutorConfig(tt.args.apacheBeamSdk, tt.args.configPath)
if (err != nil) != tt.wantErr {
t.Errorf("createExecutorConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("createExecutorConfig() got = %v\n, want %v\n", got, tt.want)
}
})
}
}
func Test_getConfigFromJson(t *testing.T) {
type args struct {
configPath string
}
tests := []struct {
name string
args args
want *ExecutorConfig
wantErr bool
}{
{
name: "Get object from json",
args: args{filepath.Join(configFolderName, pb.Sdk_SDK_JAVA.String()+jsonExt)},
want: NewExecutorConfig("javac", "java", "java", []string{"-d", "bin", "-parameters", "-classpath"}, []string{"-cp", "bin:"}, []string{"-cp", "bin:", "org.junit.runner.JUnitCore"}),
wantErr: false,
},
{
name: "Error if wrong json path",
args: args{filepath.Join("wrong_folder", pb.Sdk_SDK_JAVA.String()+jsonExt)},
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := getConfigFromJson(tt.args.configPath)
if (err != nil) != tt.wantErr {
t.Errorf("getConfigFromJson() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("getConfigFromJson() got = %v, want %v", got, tt.want)
}
})
}
}
func TestConfigureBeamEnvs(t *testing.T) {
workingDir := "./"
modDir := "/modDir"
goExecutorConfig := NewExecutorConfig(
"go",
"",
"go",
[]string{"build", "-o", "bin"},
[]string{},
[]string{"test", "-v"},
)
pythonExecutorConfig := NewExecutorConfig(
"",
"python3",
"pytest",
[]string{},
[]string{},
[]string{},
)
scioExecutorConfig := NewExecutorConfig(
"",
"sbt",
"sbt",
[]string{},
[]string{"runMain"},
[]string{},
)
type args struct {
workingDir string
}
tests := []struct {
name string
args args
want *BeamEnvs
wantErr bool
envsToSet map[string]string
}{
{
name: "PREPARED_MOD_DIR is not specified in the environment for GO sdk",
args: args{workingDir: workingDir},
want: nil,
wantErr: true,
envsToSet: map[string]string{beamSdkKey: "SDK_GO"},
},
{
name: "BeamSdkKey set to GO sdk",
args: args{workingDir: workingDir},
want: NewBeamEnvs(pb.Sdk_SDK_GO, defaultBeamVersion, goExecutorConfig, modDir, defaultNumOfParallelJobs),
wantErr: false,
envsToSet: map[string]string{beamSdkKey: "SDK_GO", preparedModDirKey: modDir},
},
{
name: "Error during creating executable config",
args: args{workingDir: "/app"},
want: nil,
wantErr: true,
envsToSet: map[string]string{beamSdkKey: "SDK_PYTHON"},
},
{
name: "BeamSdkKey set to Python sdk",
want: NewBeamEnvs(pb.Sdk_SDK_PYTHON, defaultBeamVersion, pythonExecutorConfig, modDir, defaultNumOfParallelJobs),
wantErr: false,
envsToSet: map[string]string{beamSdkKey: "SDK_PYTHON"},
},
{
name: "BeamSdkKey set to SCIO sdk",
want: NewBeamEnvs(pb.Sdk_SDK_SCIO, defaultBeamVersion, scioExecutorConfig, modDir, defaultNumOfParallelJobs),
wantErr: false,
envsToSet: map[string]string{beamSdkKey: "SDK_SCIO"},
},
{
name: "NumOfParallelJobsKey is set with a positive number",
want: NewBeamEnvs(pb.Sdk_SDK_PYTHON, defaultBeamVersion, pythonExecutorConfig, modDir, 1),
wantErr: false,
envsToSet: map[string]string{beamSdkKey: "SDK_PYTHON", numOfParallelJobsKey: "1"},
},
{
name: "NumOfParallelJobsKey is set with a negative number",
want: NewBeamEnvs(pb.Sdk_SDK_PYTHON, defaultBeamVersion, pythonExecutorConfig, modDir, defaultNumOfParallelJobs),
wantErr: false,
envsToSet: map[string]string{beamSdkKey: "SDK_PYTHON", numOfParallelJobsKey: "-1"},
},
{
name: "NumOfParallelJobsKey is set with incorrect value",
want: NewBeamEnvs(pb.Sdk_SDK_PYTHON, defaultBeamVersion, pythonExecutorConfig, modDir, defaultNumOfParallelJobs),
wantErr: false,
envsToSet: map[string]string{beamSdkKey: "SDK_PYTHON", numOfParallelJobsKey: "incorrectValue"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := setOsEnvs(tt.envsToSet); err != nil {
t.Fatalf("couldn't setup os env")
}
got, err := ConfigureBeamEnvs(tt.args.workingDir)
if (err != nil) != tt.wantErr {
t.Errorf("ConfigureBeamEnvs() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ConfigureBeamEnvs() got = %v, want %v", got, tt.want)
}
})
}
}