blob: cbf7a8d83ac0d52f6f6acdd614efe9aa2320146e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package code_processing
import (
"context"
"encoding/json"
"fmt"
"github.com/google/go-cmp/cmp"
"io/fs"
"os"
"os/exec"
"path/filepath"
"reflect"
"strings"
"sync"
"testing"
"time"
"github.com/go-redis/redismock/v8"
"github.com/google/uuid"
"go.uber.org/goleak"
pb "beam.apache.org/playground/backend/internal/api/v1"
"beam.apache.org/playground/backend/internal/cache"
"beam.apache.org/playground/backend/internal/cache/local"
"beam.apache.org/playground/backend/internal/cache/redis"
"beam.apache.org/playground/backend/internal/db/entity"
"beam.apache.org/playground/backend/internal/environment"
"beam.apache.org/playground/backend/internal/executors"
"beam.apache.org/playground/backend/internal/fs_tool"
"beam.apache.org/playground/backend/internal/utils"
"beam.apache.org/playground/backend/internal/validators"
)
const (
javaConfig = "{\n \"compile_cmd\": \"javac\",\n \"run_cmd\": \"java\",\n \"test_cmd\": \"java\",\n \"compile_args\": [\n \"-d\",\n \"bin\",\n \"-parameters\",\n \"-classpath\"\n ],\n \"run_args\": [\n \"-cp\",\n \"../bin:\"\n ],\n \"test_args\": [\n \"-cp\",\n \"bin:\",\n \"JUnit\"\n ]\n}"
pythonConfig = "{\n \"compile_cmd\": \"\",\n \"run_cmd\": \"python3\",\n \"compile_args\": [],\n \"run_args\": []\n}"
goConfig = "{\n \"compile_cmd\": \"go\",\n \"run_cmd\": \"\",\n \"compile_args\": [\n \"build\",\n \"-o\",\n \"bin\"\n ],\n \"run_args\": [\n ]\n}"
pipelinesFolder = "executable_files"
configFolder = "configs"
resourcesFolder = "resources"
helloWordPython = "if __name__ == \"__main__\":\n print(\"Hello world!\")\n"
helloWordGo = "package main\nimport \"fmt\"\nfunc main() {\n fmt.Println(\"hello world\")\n}\n"
helloWordJava = "class HelloWorld {\n public static void main(String[] args) {\n System.out.println(\"Hello world!\");\n }\n}"
graphFilePath = "resources/graph.dot"
jsonExtension = ".json"
)
var opt goleak.Option
var cacheService cache.Cache
func TestMain(m *testing.M) {
setup()
opt = goleak.IgnoreCurrent()
exitValue := m.Run()
teardown()
os.Exit(exitValue)
}
func setup() {
// create configs for java
err := os.MkdirAll(configFolder, fs.ModePerm)
if err != nil {
panic(err)
}
filePath := filepath.Join(configFolder, fmt.Sprintf("%s%s", pb.Sdk_SDK_JAVA.String(), jsonExtension))
err = os.WriteFile(filePath, []byte(javaConfig), 0600)
if err != nil {
panic(err)
}
// create dir with graph file
err = os.MkdirAll(resourcesFolder, fs.ModePerm)
if err != nil {
panic(err)
}
err = os.WriteFile(graphFilePath, []byte("graph"), 0600)
if err != nil {
panic(err)
}
path, err := os.Getwd()
if err != nil {
panic(err)
}
err = os.Setenv("BEAM_SDK", pb.Sdk_SDK_JAVA.String())
if err != nil {
panic(err)
}
err = os.Setenv("APP_WORK_DIR", path)
if err != nil {
panic(err)
}
cacheService = local.New(context.Background())
}
func teardown() {
err := os.RemoveAll(configFolder)
if err != nil {
panic(fmt.Errorf("error during test teardown: %s", err.Error()))
}
err = os.RemoveAll(pipelinesFolder)
if err != nil {
panic(fmt.Errorf("error during test teardown: %s", err.Error()))
}
err = os.RemoveAll(resourcesFolder)
if err != nil {
panic(fmt.Errorf("error during test teardown: %s", err.Error()))
}
os.Clearenv()
}
func Test_Process(t *testing.T) {
defer goleak.VerifyNone(t, opt)
appEnvs, err := environment.GetApplicationEnvsFromOsEnvs()
if err != nil {
panic(err)
}
sdkJavaEnv, err := environment.ConfigureBeamEnvs(appEnvs.WorkingDir())
if err != nil {
panic(err)
}
sdkGoEnv := *sdkJavaEnv
sdkGoEnv.ApacheBeamSdk = pb.Sdk_SDK_GO
incorrectGoHelloWord := "package main\nimport \"fmt\"\nfunc main() {\n fmt.Println(\"hello world\").\n}\n"
type args struct {
ctx context.Context
appEnv *environment.ApplicationEnvs
sdkEnv *environment.BeamEnvs
pipelineId uuid.UUID
pipelineOptions string
}
tests := []struct {
name string
createExecFile bool
code string
cancelFunc bool
expectedStatus pb.Status
expectedRunOutput interface{}
expectedRunError interface{}
expectedCompileOutput interface{}
args args
}{
{
// Test case with calling processCode method with small timeout.
// As a result status into cache should be set as Status_STATUS_RUN_TIMEOUT.
name: "Small pipeline execution timeout",
createExecFile: false,
code: "",
cancelFunc: false,
expectedStatus: pb.Status_STATUS_RUN_TIMEOUT,
expectedCompileOutput: nil,
expectedRunOutput: nil,
expectedRunError: nil,
args: args{
ctx: context.Background(),
appEnv: &environment.ApplicationEnvs{},
sdkEnv: sdkJavaEnv,
pipelineId: uuid.New(),
pipelineOptions: "",
},
},
{
// Test case with calling processCode method without preparing files with code.
// As a result status into cache should be set as Status_STATUS_VALIDATION_ERROR.
name: "Validation failed",
createExecFile: false,
code: "",
cancelFunc: false,
expectedStatus: pb.Status_STATUS_VALIDATION_ERROR,
expectedCompileOutput: nil,
expectedRunOutput: nil,
expectedRunError: nil,
args: args{
ctx: context.Background(),
appEnv: appEnvs,
sdkEnv: sdkJavaEnv,
pipelineId: uuid.New(),
pipelineOptions: "",
},
},
{
// Test case with calling processCode method with incorrect code.
// As a result status into cache should be set as Status_STATUS_COMPILE_ERROR.
name: "Compilation failed",
createExecFile: true,
code: "MOCK_CODE",
cancelFunc: false,
expectedStatus: pb.Status_STATUS_COMPILE_ERROR,
expectedCompileOutput: "error: exit status 1\noutput: %s:1: error: reached end of file while parsing\nMOCK_CODE\n^\n1 error\n",
expectedRunOutput: nil,
expectedRunError: nil,
args: args{
ctx: context.Background(),
appEnv: appEnvs,
sdkEnv: sdkJavaEnv,
pipelineId: uuid.New(),
pipelineOptions: "",
},
},
{
// Test case with calling processCode method with incorrect logic into code.
// As a result status into cache should be set as Status_STATUS_RUN_ERROR.
name: "Run failed",
createExecFile: true,
code: "class HelloWorld {\n public static void main(String[] args) {\n System.out.println(1/0);\n }\n}",
cancelFunc: false,
expectedStatus: pb.Status_STATUS_RUN_ERROR,
expectedCompileOutput: "",
expectedRunOutput: "",
expectedRunError: "error: exit status 1\noutput: Exception in thread \"main\" java.lang.ArithmeticException: / by zero\n\tat HelloWorld.main(%s.java:3)\n",
args: args{
ctx: context.Background(),
appEnv: appEnvs,
sdkEnv: sdkJavaEnv,
pipelineId: uuid.New(),
pipelineOptions: "",
},
},
{
// Test case with calling processCode with canceling code processing.
// As a result status into cache should be set as Status_STATUS_CANCELED.
name: "Cancel",
createExecFile: true,
code: "class HelloWorld {\n public static void main(String[] args) {\n while(true){}\n }\n}",
cancelFunc: true,
expectedStatus: pb.Status_STATUS_CANCELED,
expectedCompileOutput: "",
expectedRunOutput: "",
expectedRunError: "",
args: args{
ctx: context.Background(),
appEnv: appEnvs,
sdkEnv: sdkJavaEnv,
pipelineId: uuid.New(),
pipelineOptions: "",
},
},
{
// Test case with calling processCode without any error cases.
// As a result status into cache should be set as Status_STATUS_FINISHED.
name: "Processing complete successfully on java sdk",
createExecFile: true,
cancelFunc: false,
code: helloWordJava,
expectedStatus: pb.Status_STATUS_FINISHED,
expectedCompileOutput: "",
expectedRunOutput: "Hello world!\n",
expectedRunError: "",
args: args{
ctx: context.Background(),
appEnv: appEnvs,
sdkEnv: sdkJavaEnv,
pipelineId: uuid.New(),
pipelineOptions: "",
},
},
{
// Test case with calling processCode method with incorrect go code.
// As a result status into cache should be set as Status_STATUS_PREPARATION_ERROR.
name: "Prepare step failed",
createExecFile: true,
code: incorrectGoHelloWord,
cancelFunc: false,
expectedStatus: pb.Status_STATUS_PREPARATION_ERROR,
expectedCompileOutput: nil,
expectedRunOutput: nil,
expectedRunError: nil,
args: args{
ctx: context.Background(),
appEnv: appEnvs,
sdkEnv: &sdkGoEnv,
pipelineId: uuid.New(),
pipelineOptions: "",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lc, _ := fs_tool.NewLifeCycle(pb.Sdk_SDK_JAVA, tt.args.pipelineId, filepath.Join(os.Getenv("APP_WORK_DIR"), pipelinesFolder))
err := lc.CreateFolders()
if err != nil {
t.Fatalf("error during prepare folders: %s", err.Error())
}
sources := []entity.FileEntity{{Name: "main.java", Content: tt.code, IsMain: true}}
if tt.createExecFile {
_ = lc.CreateSourceCodeFiles(sources)
}
if err = utils.SetToCache(cacheService, tt.args.pipelineId, cache.Canceled, false); err != nil {
t.Fatal("error during set cancel flag to cache")
}
if tt.cancelFunc {
go func(ctx context.Context, pipelineId uuid.UUID) {
// to imitate behavior of cancellation
time.Sleep(5 * time.Second)
_ = cacheService.SetValue(ctx, pipelineId, cache.Canceled, true)
}(tt.args.ctx, tt.args.pipelineId)
}
Process(tt.args.ctx, cacheService, lc, tt.args.pipelineId, tt.args.appEnv, tt.args.sdkEnv, tt.args.pipelineOptions)
status, _ := cacheService.GetValue(tt.args.ctx, tt.args.pipelineId, cache.Status)
if !reflect.DeepEqual(status, tt.expectedStatus) {
t.Errorf("processCode() set status: %s, but expectes: %s", status, tt.expectedStatus)
}
compileOutput, _ := cacheService.GetValue(tt.args.ctx, tt.args.pipelineId, cache.CompileOutput)
if tt.expectedCompileOutput != nil && strings.Contains(tt.expectedCompileOutput.(string), "%s") {
tt.expectedCompileOutput = fmt.Sprintf(tt.expectedCompileOutput.(string), lc.Paths.AbsoluteSourceFilePath)
}
if !reflect.DeepEqual(compileOutput, tt.expectedCompileOutput) {
t.Errorf("processCode() set compileOutput: %s, but expectes: %s", compileOutput, tt.expectedCompileOutput)
}
runOutput, _ := cacheService.GetValue(tt.args.ctx, tt.args.pipelineId, cache.RunOutput)
if !reflect.DeepEqual(runOutput, tt.expectedRunOutput) {
t.Errorf("processCode() set runOutput: %s, but expectes: %s", runOutput, tt.expectedRunOutput)
}
runError, _ := cacheService.GetValue(tt.args.ctx, tt.args.pipelineId, cache.RunError)
if tt.expectedRunError != nil && strings.Contains(tt.expectedRunError.(string), "%s") {
tt.expectedRunError = fmt.Sprintf(tt.expectedRunError.(string), tt.args.pipelineId)
}
if !reflect.DeepEqual(runError, tt.expectedRunError) {
t.Errorf("processCode() set runError: %s, but expectes: %s", runError, tt.expectedRunError)
}
})
}
}
func TestGetProcessingOutput(t *testing.T) {
defer goleak.VerifyNone(t, opt)
pipelineId := uuid.New()
incorrectConvertPipelineId := uuid.New()
err := cacheService.SetValue(context.Background(), pipelineId, cache.RunOutput, "MOCK_RUN_OUTPUT")
if err != nil {
panic(err)
}
err = cacheService.SetValue(context.Background(), incorrectConvertPipelineId, cache.RunOutput, cache.RunOutput)
if err != nil {
panic(err)
}
type args struct {
ctx context.Context
cacheService cache.Cache
key uuid.UUID
subKey cache.SubKey
errorTitle string
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
// Test case with calling GetProcessingOutput with pipelineId which doesn't contain run output.
// As a result, want to receive an error.
name: "Get run output with incorrect pipelineId",
args: args{
ctx: context.Background(),
cacheService: cacheService,
key: uuid.New(),
subKey: cache.RunOutput,
errorTitle: "",
},
want: "",
wantErr: true,
},
{
// Test case with calling GetProcessingOutput with pipelineId which contains incorrect run output.
// As a result, want to receive an error.
name: "Get run output with incorrect run output",
args: args{
ctx: context.Background(),
cacheService: cacheService,
key: incorrectConvertPipelineId,
subKey: cache.RunOutput,
errorTitle: "",
},
want: "",
wantErr: true,
},
{
// Test case with calling GetProcessingOutput with pipelineId which contains run output.
// As a result, want to receive an expected string.
name: "Get run output with correct pipelineId",
args: args{
ctx: context.Background(),
cacheService: cacheService,
key: pipelineId,
subKey: cache.RunOutput,
errorTitle: "",
},
want: "MOCK_RUN_OUTPUT",
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := GetProcessingOutput(tt.args.ctx, tt.args.cacheService, tt.args.key, tt.args.subKey, tt.args.errorTitle)
if (err != nil) != tt.wantErr {
t.Errorf("GetProcessingOutput() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("GetProcessingOutput() got = %v, want %v", got, tt.want)
}
})
}
}
func TestGetProcessingStatus(t *testing.T) {
defer goleak.VerifyNone(t, opt)
pipelineId := uuid.New()
incorrectConvertPipelineId := uuid.New()
err := cacheService.SetValue(context.Background(), pipelineId, cache.Status, pb.Status_STATUS_FINISHED)
if err != nil {
panic(err)
}
err = cacheService.SetValue(context.Background(), incorrectConvertPipelineId, cache.Status, "MOCK_STATUS")
if err != nil {
panic(err)
}
type args struct {
ctx context.Context
cacheService cache.Cache
key uuid.UUID
errorTitle string
}
tests := []struct {
name string
args args
want pb.Status
wantErr bool
}{
{
// Test case with calling GetProcessingStatus with pipelineId which doesn't contain status.
// As a result, want to receive an error.
name: "Get status with incorrect pipelineId",
args: args{
ctx: context.Background(),
cacheService: cacheService,
key: uuid.New(),
errorTitle: "",
},
want: pb.Status_STATUS_UNSPECIFIED,
wantErr: true,
},
{
// Test case with calling GetProcessingStatus with pipelineId which contains incorrect status value in cache.
// As a result, want to receive an error.
name: "Get status with incorrect cache value",
args: args{
ctx: context.Background(),
cacheService: cacheService,
key: incorrectConvertPipelineId,
errorTitle: "",
},
want: pb.Status_STATUS_UNSPECIFIED,
wantErr: true,
},
{
// Test case with calling GetProcessingStatus with pipelineId which contains status.
// As a result, want to receive an expected status.
name: "Get status with correct pipelineId",
args: args{
ctx: context.Background(),
cacheService: cacheService,
key: pipelineId,
errorTitle: "",
},
want: pb.Status_STATUS_FINISHED,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := GetProcessingStatus(tt.args.ctx, tt.args.cacheService, tt.args.key, tt.args.errorTitle)
if (err != nil) != tt.wantErr {
t.Errorf("GetProcessingStatus() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("GetProcessingStatus() got = %v, want %v", got, tt.want)
}
})
}
}
func TestGetLastIndex(t *testing.T) {
defer goleak.VerifyNone(t, opt)
pipelineId := uuid.New()
incorrectConvertPipelineId := uuid.New()
err := cacheService.SetValue(context.Background(), pipelineId, cache.RunOutputIndex, 2)
if err != nil {
panic(err)
}
type args struct {
ctx context.Context
cacheService cache.Cache
key uuid.UUID
subKey cache.SubKey
errorTitle string
}
tests := []struct {
name string
args args
want int
wantErr bool
}{
{
// Test case with calling GetLastIndex with pipelineId which doesn't contain last index.
// As a result, want to receive an error.
name: "Get last index with incorrect pipelineId",
args: args{
ctx: context.Background(),
cacheService: cacheService,
key: uuid.New(),
subKey: cache.RunOutputIndex,
errorTitle: "",
},
want: 0,
wantErr: true,
},
{
// Test case with calling GetLastIndex with pipelineId which contains incorrect status value in cache.
// As a result, want to receive an error.
name: "Get last index with incorrect cache value",
args: args{
ctx: context.Background(),
cacheService: cacheService,
key: incorrectConvertPipelineId,
subKey: cache.RunOutputIndex,
errorTitle: "",
},
want: 0,
wantErr: true,
},
{
// Test case with calling GetLastIndex with pipelineId which contains last index.
// As a result, want to receive an expected last index.
name: "Get last index with correct pipelineId",
args: args{
ctx: context.Background(),
cacheService: cacheService,
key: pipelineId,
subKey: cache.RunOutputIndex,
errorTitle: "",
},
want: 2,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := GetLastIndex(tt.args.ctx, tt.args.cacheService, tt.args.key, tt.args.subKey, tt.args.errorTitle)
if (err != nil) != tt.wantErr {
t.Errorf("GetLastIndex() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("GetLastIndex() got = %v, want %v", got, tt.want)
}
})
}
}
func Test_getRunOrTestCmd(t *testing.T) {
runEx := executors.NewExecutorBuilder().
WithRunner().
WithCommand("runCommand").
WithArgs([]string{"arg1"}).
WithPipelineOptions([]string{""}).
Build()
testEx := executors.NewExecutorBuilder().
WithTestRunner().
WithCommand("testCommand").
WithArgs([]string{"arg1"}).
Build()
wantRunExec := exec.CommandContext(context.Background(), "runCommand", "arg1")
wantTestExec := exec.CommandContext(context.Background(), "testCommand", "arg1")
type args struct {
isUnitTest bool
executor *executors.Executor
ctxWithTimeout context.Context
}
tests := []struct {
name string
args args
want *exec.Cmd
}{
{
//Get cmd objects with set run executor
name: "Get run cmd",
args: args{
isUnitTest: false,
executor: &runEx,
ctxWithTimeout: context.Background(),
},
want: wantRunExec,
},
{
//Get cmd objects with set test executor
name: "Get test cmd",
args: args{
isUnitTest: true,
executor: &testEx,
ctxWithTimeout: context.Background(),
},
want: wantTestExec,
},
}
execComparer := cmp.Comparer(func(a exec.Cmd, b exec.Cmd) bool {
return a.Path == b.Path &&
cmp.Equal(a.Args, b.Args) &&
cmp.Equal(a.Env, b.Env) &&
a.Dir == b.Dir
})
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getExecuteCmd(tt.args.isUnitTest, tt.args.executor, tt.args.ctxWithTimeout); !cmp.Equal(got, tt.want, execComparer) {
t.Errorf("getExecuteCmd() = '%v', want '%v', diff = %v", got, tt.want, cmp.Diff(got, tt.want, execComparer))
}
})
}
}
func getSdkEnv(sdk pb.Sdk) (*environment.BeamEnvs, error) {
setupSDK(sdk)
appEnvs, err := environment.GetApplicationEnvsFromOsEnvs()
if err != nil {
return nil, err
}
sdkEnv, err := environment.ConfigureBeamEnvs(appEnvs.WorkingDir())
if err != nil {
return nil, err
}
return sdkEnv, nil
}
func setupSDK(sdk pb.Sdk) {
err := os.MkdirAll(configFolder, fs.ModePerm)
if err != nil {
panic(err)
}
filePath := filepath.Join(configFolder, fmt.Sprintf("%s%s", sdk.String(), jsonExtension))
switch sdk {
case pb.Sdk_SDK_JAVA:
err = os.WriteFile(filePath, []byte(javaConfig), 0600)
case pb.Sdk_SDK_PYTHON:
err = os.WriteFile(filePath, []byte(pythonConfig), 0600)
case pb.Sdk_SDK_GO:
err = os.WriteFile(filePath, []byte(goConfig), 0600)
}
if err != nil {
panic(err)
}
err = os.Setenv("BEAM_SDK", sdk.String())
if err != nil {
panic(err)
}
err = os.Setenv("APP_WORK_DIR", "")
if err != nil {
panic(err)
}
err = os.Setenv("PREPARED_MOD_DIR", "")
if err != nil {
panic(err)
}
cacheService = local.New(context.Background())
}
func teardownBenchmarks() {
err := os.RemoveAll(configFolder)
if err != nil {
panic(fmt.Errorf("error during test teardown: %s", err.Error()))
}
err = os.RemoveAll(pipelinesFolder)
if err != nil {
panic(fmt.Errorf("error during test teardown: %s", err.Error()))
}
}
func prepareFiles(b *testing.B, pipelineId uuid.UUID, code string, sdk pb.Sdk) *fs_tool.LifeCycle {
lc, err := fs_tool.NewLifeCycle(sdk, pipelineId, pipelinesFolder)
if err != nil {
b.Fatalf("error during initializse lc: %s", err.Error())
}
err = lc.CreateFolders()
if err != nil {
b.Fatalf("error during prepare folders: %s", err.Error())
}
sources := []entity.FileEntity{{Name: "main.java", Content: code, IsMain: true}}
err = lc.CreateSourceCodeFiles(sources)
if err != nil {
b.Fatalf("error during prepare source code file: %s", err.Error())
}
return lc
}
func Benchmark_ProcessJava(b *testing.B) {
setupSDK(pb.Sdk_SDK_JAVA)
defer teardownBenchmarks()
appEnv, err := environment.GetApplicationEnvsFromOsEnvs()
if err != nil {
b.Fatalf("error during preparing appEnv: %s", err)
}
sdkEnv, err := environment.ConfigureBeamEnvs(appEnv.WorkingDir())
if err != nil {
b.Fatalf("error during preparing sdkEnv: %s", err)
}
ctx := context.Background()
code := "class HelloWorld {\n public static void main(String[] args) {\n System.out.println(\"Hello world!\");\n }\n}"
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
pipelineId := uuid.New()
lc := prepareFiles(b, pipelineId, code, pb.Sdk_SDK_JAVA)
if err = utils.SetToCache(cacheService, pipelineId, cache.Canceled, false); err != nil {
b.Fatal("error during set cancel flag to cache")
}
b.StartTimer()
Process(ctx, cacheService, lc, pipelineId, appEnv, sdkEnv, "")
}
}
func Benchmark_ProcessPython(b *testing.B) {
setupSDK(pb.Sdk_SDK_PYTHON)
defer teardownBenchmarks()
appEnv, err := environment.GetApplicationEnvsFromOsEnvs()
if err != nil {
b.Fatalf("error during preparing appEnv: %s", err)
}
sdkEnv, err := environment.ConfigureBeamEnvs(appEnv.WorkingDir())
if err != nil {
b.Fatalf("error during preparing sdkEnv: %s", err)
}
ctx := context.Background()
wordCountCode := "import argparse\nimport logging\nimport re\n\nimport apache_beam as beam\nfrom apache_beam.io import ReadFromText\nfrom apache_beam.io import WriteToText\nfrom apache_beam.options.pipeline_options import PipelineOptions\nfrom apache_beam.options.pipeline_options import SetupOptions\n\n\nclass WordExtractingDoFn(beam.DoFn):\n \"\"\"Parse each line of input text into words.\"\"\"\n def process(self, element):\n \"\"\"Returns an iterator over the words of this element.\n\n The element is a line of text. If the line is blank, note that, too.\n\n Args:\n element: the element being processed\n\n Returns:\n The processed element.\n \"\"\"\n return re.findall(r'[\\w\\']+', element, re.UNICODE)\n\n\ndef run(argv=None, save_main_session=True):\n \"\"\"Main entry point; defines and runs the wordcount pipeline.\"\"\"\n parser = argparse.ArgumentParser()\n parser.add_argument(\n '--input',\n dest='input',\n default='gs://dataflow-samples/shakespeare/kinglear.txt',\n help='Input file to process.')\n parser.add_argument(\n '--output',\n dest='output',\n required=True,\n help='Output file to write results to.')\n known_args, pipeline_args = parser.parse_known_args(argv)\n\n # We use the save_main_session option because one or more DoFn's in this\n # workflow rely on global context (e.g., a module imported at module level).\n pipeline_options = PipelineOptions(pipeline_args)\n pipeline_options.view_as(SetupOptions).save_main_session = save_main_session\n\n # The pipeline will be run on exiting the with block.\n with beam.Pipeline(options=pipeline_options) as p:\n\n # Read the text file[pattern] into a PCollection.\n lines = p | 'Read' >> ReadFromText(known_args.input)\n\n counts = (\n lines\n | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))\n | 'PairWithOne' >> beam.Map(lambda x: (x, 1))\n | 'GroupAndSum' >> beam.CombinePerKey(sum))\n\n # Format the counts into a PCollection of strings.\n def format_result(word, count):\n return '%s: %d' % (word, count)\n\n output = counts | 'Format' >> beam.MapTuple(format_result)\n\n # Write the output using a \"Write\" transform that has side effects.\n # pylint: disable=expression-not-assigned\n output | 'Write' >> WriteToText(known_args.output)\n\n\nif __name__ == '__main__':\n logging.getLogger().setLevel(logging.INFO)\n run()"
pipelineOptions := "--output t.txt"
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
pipelineId := uuid.New()
lc := prepareFiles(b, pipelineId, wordCountCode, pb.Sdk_SDK_PYTHON)
if err = utils.SetToCache(cacheService, pipelineId, cache.Canceled, false); err != nil {
b.Fatal("error during set cancel flag to cache")
}
b.StartTimer()
Process(ctx, cacheService, lc, pipelineId, appEnv, sdkEnv, pipelineOptions)
}
}
func Benchmark_ProcessGo(b *testing.B) {
setupSDK(pb.Sdk_SDK_GO)
defer teardownBenchmarks()
appEnv, err := environment.GetApplicationEnvsFromOsEnvs()
if err != nil {
b.Fatalf("error during preparing appEnv: %s", err)
}
sdkEnv, err := environment.ConfigureBeamEnvs(appEnv.WorkingDir())
if err != nil {
b.Fatalf("error during preparing sdkEnv: %s", err)
}
ctx := context.Background()
code := "package main\n\nimport \"fmt\"\n\nfunc main() {\n fmt.Println(\"Hello world!\")\n}"
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
pipelineId := uuid.New()
lc := prepareFiles(b, pipelineId, code, pb.Sdk_SDK_GO)
if err = utils.SetToCache(cacheService, pipelineId, cache.Canceled, false); err != nil {
b.Fatal("error during set cancel flag to cache")
}
b.StartTimer()
Process(ctx, cacheService, lc, pipelineId, appEnv, sdkEnv, "")
}
}
func Benchmark_GetProcessingOutput(b *testing.B) {
pipelineId := uuid.New()
subKey := cache.RunOutput
ctx := context.Background()
err := cacheService.SetValue(ctx, pipelineId, subKey, "MOCK_RUN_OUTPUT")
if err != nil {
b.Fatalf("error during prepare cache value: %s", err.Error())
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = GetProcessingOutput(ctx, cacheService, pipelineId, subKey, "")
}
}
func Benchmark_GetProcessingStatus(b *testing.B) {
pipelineId := uuid.New()
subKey := cache.Status
ctx := context.Background()
err := cacheService.SetValue(ctx, pipelineId, subKey, pb.Status_STATUS_FINISHED)
if err != nil {
b.Fatalf("error during prepare cache value: %s", err.Error())
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = GetProcessingStatus(ctx, cacheService, pipelineId, "")
}
}
func Benchmark_GetLastIndex(b *testing.B) {
pipelineId := uuid.New()
subKey := cache.RunOutputIndex
ctx := context.Background()
err := cacheService.SetValue(ctx, pipelineId, subKey, 5)
if err != nil {
b.Fatalf("error during prepare cache value: %s", err.Error())
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = GetLastIndex(ctx, cacheService, pipelineId, subKey, "")
}
}
func Test_validateStep(t *testing.T) {
javaSdkEnv, err := getSdkEnv(pb.Sdk_SDK_JAVA)
if err != nil {
panic(err)
}
incorrectSdkEnv := &environment.BeamEnvs{
ApacheBeamSdk: pb.Sdk_SDK_UNSPECIFIED,
ExecutorConfig: nil,
}
type args struct {
ctx context.Context
cacheService cache.Cache
pipelineId uuid.UUID
sdkEnv *environment.BeamEnvs
pipelineLifeCycleCtx context.Context
validationResults *sync.Map
}
tests := []struct {
name string
args args
want int
code string
}{
{
name: "Test validation step by checking number of validators",
args: args{
ctx: context.Background(),
cacheService: cacheService,
pipelineId: uuid.New(),
sdkEnv: javaSdkEnv,
pipelineLifeCycleCtx: context.Background(),
validationResults: &sync.Map{},
},
want: 3,
code: helloWordJava,
},
{
name: "Test validation step with incorrect sdkEnv",
args: args{
ctx: context.Background(),
cacheService: cacheService,
pipelineId: uuid.New(),
sdkEnv: incorrectSdkEnv,
pipelineLifeCycleCtx: context.Background(),
validationResults: &sync.Map{},
},
want: 0,
code: helloWordJava,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lc, _ := fs_tool.NewLifeCycle(pb.Sdk_SDK_JAVA, tt.args.pipelineId, filepath.Join(os.Getenv("APP_WORK_DIR"), pipelinesFolder))
err := lc.CreateFolders()
if err != nil {
t.Fatalf("error during prepare folders: %s", err.Error())
}
sources := []entity.FileEntity{{Name: "main.java", Content: tt.code, IsMain: true}}
_ = lc.CreateSourceCodeFiles(sources)
err = validateStep(tt.args.pipelineLifeCycleCtx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.validationResults)
got := syncMapLen(tt.args.validationResults)
if err != nil && !reflect.DeepEqual(got, tt.want) {
t.Errorf("validateStep() = %d, want %d", got, tt.want)
}
})
}
}
func Test_prepareStep(t *testing.T) {
javaSdkEnv, err := getSdkEnv(pb.Sdk_SDK_JAVA)
if err != nil {
panic(err)
}
incorrectSdkEnv := &environment.BeamEnvs{
ApacheBeamSdk: pb.Sdk_SDK_UNSPECIFIED,
ExecutorConfig: nil,
}
validationResults := sync.Map{}
validationResults.Store(validators.UnitTestValidatorName, false)
validationResults.Store(validators.KatasValidatorName, false)
pipelineLifeCycleCtx, cancel := context.WithTimeout(context.Background(), 1)
defer cancel()
type args struct {
ctx context.Context
cacheService cache.Cache
pipelineId uuid.UUID
sdkEnv *environment.BeamEnvs
pipelineLifeCycleCtx context.Context
validationResults *sync.Map
}
tests := []struct {
name string
args args
code string
expectedStatus pb.Status
}{
{
name: "Test preparer step working without an error",
args: args{
ctx: context.Background(),
cacheService: cacheService,
pipelineId: uuid.New(),
sdkEnv: javaSdkEnv,
pipelineLifeCycleCtx: context.Background(),
validationResults: &validationResults,
},
code: helloWordJava,
expectedStatus: pb.Status_STATUS_COMPILING,
},
{
name: "Test preparer step working with incorrect sdkEnv",
args: args{
ctx: context.Background(),
cacheService: cacheService,
pipelineId: uuid.New(),
sdkEnv: incorrectSdkEnv,
pipelineLifeCycleCtx: context.Background(),
validationResults: &validationResults,
},
code: "",
expectedStatus: pb.Status_STATUS_ERROR,
},
{
name: "Error during expired context of the example",
args: args{
ctx: context.Background(),
cacheService: cacheService,
pipelineId: uuid.New(),
sdkEnv: javaSdkEnv,
pipelineLifeCycleCtx: pipelineLifeCycleCtx,
validationResults: &validationResults,
},
code: "",
expectedStatus: pb.Status_STATUS_RUN_TIMEOUT,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lc, _ := fs_tool.NewLifeCycle(pb.Sdk_SDK_JAVA, tt.args.pipelineId, filepath.Join(os.Getenv("APP_WORK_DIR"), pipelinesFolder))
err := lc.CreateFolders()
if err != nil {
t.Fatalf("error during prepare folders: %s", err.Error())
}
sources := []entity.FileEntity{{Name: "main.java", Content: tt.code, IsMain: true}}
_ = lc.CreateSourceCodeFiles(sources)
_ = prepareStep(tt.args.pipelineLifeCycleCtx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.validationResults, nil)
status, _ := cacheService.GetValue(tt.args.ctx, tt.args.pipelineId, cache.Status)
if status != tt.expectedStatus {
t.Errorf("prepareStep: got status = %v, want %v", status, tt.expectedStatus)
}
})
}
}
func Test_compileStep(t *testing.T) {
sdkJavaEnv, err := getSdkEnv(pb.Sdk_SDK_JAVA)
if err != nil {
panic(err)
}
sdkPythonEnv, err := getSdkEnv(pb.Sdk_SDK_PYTHON)
if err != nil {
panic(err)
}
pipelineLifeCycleCtx, cancel := context.WithTimeout(context.Background(), 1)
defer cancel()
type args struct {
ctx context.Context
cacheService cache.Cache
pipelineId uuid.UUID
sdkEnv *environment.BeamEnvs
isUnitTest bool
pipelineLifeCycleCtx context.Context
}
tests := []struct {
name string
args args
code string
expectedStatus pb.Status
}{
{
name: "Test compilation step finishes successfully on java sdk",
args: args{
ctx: context.Background(),
cacheService: cacheService,
pipelineId: uuid.New(),
sdkEnv: sdkJavaEnv,
isUnitTest: false,
pipelineLifeCycleCtx: context.Background(),
},
code: helloWordJava,
expectedStatus: pb.Status_STATUS_EXECUTING,
},
{
name: "Test compilation step finishes successfully on python sdk",
args: args{
ctx: context.Background(),
cacheService: cacheService,
pipelineId: uuid.New(),
sdkEnv: sdkPythonEnv,
isUnitTest: false,
pipelineLifeCycleCtx: context.Background(),
},
code: helloWordPython,
expectedStatus: pb.Status_STATUS_EXECUTING,
},
{
name: "Error during expired context of the example",
args: args{
ctx: context.Background(),
cacheService: cacheService,
pipelineId: uuid.New(),
sdkEnv: sdkJavaEnv,
isUnitTest: false,
pipelineLifeCycleCtx: pipelineLifeCycleCtx,
},
code: helloWordJava,
expectedStatus: pb.Status_STATUS_RUN_TIMEOUT,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lc, _ := fs_tool.NewLifeCycle(tt.args.sdkEnv.ApacheBeamSdk, tt.args.pipelineId, filepath.Join(os.Getenv("APP_WORK_DIR"), pipelinesFolder))
err := lc.CreateFolders()
if err != nil {
t.Fatalf("error during prepare folders: %s", err.Error())
}
sources := []entity.FileEntity{{Name: "main.java", Content: tt.code, IsMain: true}}
_ = lc.CreateSourceCodeFiles(sources)
_ = compileStep(tt.args.pipelineLifeCycleCtx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.isUnitTest)
status, _ := cacheService.GetValue(tt.args.ctx, tt.args.pipelineId, cache.Status)
if status != tt.expectedStatus {
t.Errorf("compileStep: got status = %v, want %v", status, tt.expectedStatus)
}
})
}
}
func Test_runStep(t *testing.T) {
sdkJavaEnv, err := getSdkEnv(pb.Sdk_SDK_JAVA)
if err != nil {
panic(err)
}
sdkPythonEnv, err := getSdkEnv(pb.Sdk_SDK_PYTHON)
if err != nil {
panic(err)
}
sdkGoEnv, err := getSdkEnv(pb.Sdk_SDK_GO)
if err != nil {
panic(err)
}
type args struct {
ctx context.Context
cacheService cache.Cache
pipelineId uuid.UUID
isUnitTest bool
sdkEnv *environment.BeamEnvs
pipelineOptions string
pipelineLifeCycleCtx context.Context
createExecFile bool
}
tests := []struct {
name string
args args
code string
expectedStatus pb.Status
}{
{
// Test case with calling runStep method on python sdk.
// cmd.Run return error during saving output.
// As a result, the pipeline status should be Status_STATUS_RUN_ERROR.
name: "Test run step working on python sdk",
args: args{
ctx: context.Background(),
cacheService: cacheService,
pipelineId: uuid.New(),
isUnitTest: false,
sdkEnv: sdkPythonEnv,
pipelineOptions: "",
pipelineLifeCycleCtx: context.Background(),
createExecFile: true,
},
code: helloWordPython,
expectedStatus: pb.Status_STATUS_RUN_ERROR,
},
{
// Test case with calling runStep method on go sdk.
// cmd.Run return error due to missing executable file.
// As a result, the pipeline status should be Status_STATUS_RUN_ERROR.
name: "Test run step working on go sdk",
args: args{
ctx: context.Background(),
cacheService: cacheService,
pipelineId: uuid.New(),
isUnitTest: true,
sdkEnv: sdkGoEnv,
pipelineOptions: "",
pipelineLifeCycleCtx: context.Background(),
createExecFile: true,
},
code: helloWordGo,
expectedStatus: pb.Status_STATUS_RUN_ERROR,
},
{
// Test case with calling runStep method without preparing files with code.
// As a result, the pipeline status should be Status_STATUS_ERROR.
name: "Test run step without preparing files with code",
args: args{
ctx: context.Background(),
cacheService: cacheService,
pipelineId: uuid.UUID{},
isUnitTest: true,
sdkEnv: sdkJavaEnv,
pipelineOptions: "",
pipelineLifeCycleCtx: context.Background(),
createExecFile: false,
},
code: helloWordJava,
expectedStatus: pb.Status_STATUS_ERROR,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lc, _ := fs_tool.NewLifeCycle(tt.args.sdkEnv.ApacheBeamSdk, tt.args.pipelineId, filepath.Join(os.Getenv("APP_WORK_DIR"), pipelinesFolder))
if tt.args.createExecFile {
err := lc.CreateFolders()
if err != nil {
t.Fatalf("error during prepare folders: %s", err.Error())
}
sources := []entity.FileEntity{{Name: "main.java", Content: tt.code, IsMain: true}}
_ = lc.CreateSourceCodeFiles(sources)
}
_ = runStep(tt.args.pipelineLifeCycleCtx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.isUnitTest, tt.args.sdkEnv, tt.args.pipelineOptions)
status, _ := cacheService.GetValue(tt.args.ctx, tt.args.pipelineId, cache.Status)
if status != tt.expectedStatus {
t.Errorf("runStep() got status = %v, want %v", status, tt.expectedStatus)
}
})
}
}
func syncMapLen(syncMap *sync.Map) int {
length := 0
syncMap.Range(func(_, _ interface{}) bool {
length++
return true
})
return length
}
func TestGetGraph(t *testing.T) {
ctx := context.Background()
pipelineId1 := uuid.New()
graph := "GRAPH"
err := cacheService.SetValue(ctx, pipelineId1, cache.Graph, graph)
if err != nil {
return
}
pipelineId2 := uuid.New()
err = cacheService.SetValue(ctx, pipelineId2, cache.Graph, 1)
if err != nil {
return
}
type args struct {
ctx context.Context
cacheService cache.Cache
key uuid.UUID
errorTitle string
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "Get graph when key exist in cache",
args: args{
ctx: context.Background(),
cacheService: cacheService,
key: pipelineId1,
errorTitle: "error",
},
want: graph,
wantErr: false,
},
{
name: "Get graph when key doesn't exist in cache",
args: args{
ctx: context.Background(),
cacheService: cacheService,
key: uuid.New(),
errorTitle: "error",
},
want: "",
wantErr: true,
},
{
name: "Get graph when value from cache by key couldn't be converted to a string",
args: args{
ctx: context.Background(),
cacheService: cacheService,
key: pipelineId2,
errorTitle: "error",
},
want: "",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := GetGraph(tt.args.ctx, tt.args.cacheService, tt.args.key, tt.args.errorTitle)
if (err != nil) != tt.wantErr {
t.Errorf("GetGraph error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("GetGraph got = %v, want %v", got, tt.want)
}
})
}
}
func Test_processSetupError(t *testing.T) {
client, mock := redismock.NewClientMock()
pipelineId := uuid.New()
errorMessage := "MOCK_ERROR"
type args struct {
err error
pipelineId uuid.UUID
cacheService cache.Cache
ctxWithTimeout context.Context
}
tests := []struct {
name string
mocks func()
args args
wantErr bool
}{
{
name: "Error during HSet operation",
mocks: func() {
mock.ExpectHSet(pipelineId.String(), "MOCK_VALUE").SetErr(fmt.Errorf(errorMessage))
},
args: args{
err: fmt.Errorf(errorMessage),
pipelineId: pipelineId,
cacheService: &redis.Cache{
Client: client,
},
ctxWithTimeout: nil,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.mocks()
if err := processSetupError(tt.args.err, tt.args.pipelineId, tt.args.cacheService); (err != nil) != tt.wantErr {
t.Errorf("processSetupError() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func Test_processErrorWithSavingOutput(t *testing.T) {
client, mock := redismock.NewClientMock()
pipelineId := uuid.New()
errorMessage := "MOCK_ERROR"
subKey := cache.RunOutput
type args struct {
ctx context.Context
err error
errorOutput []byte
pipelineId uuid.UUID
subKey cache.SubKey
cacheService cache.Cache
errorTitle string
newStatus pb.Status
}
tests := []struct {
name string
mocks func()
args args
wantErr bool
}{
{
name: "Error during HSet operation",
mocks: func() {
mock.ExpectHSet(pipelineId.String(), subKey).SetErr(fmt.Errorf(errorMessage))
},
args: args{
ctx: context.Background(),
err: fmt.Errorf(errorMessage),
errorOutput: nil,
pipelineId: pipelineId,
subKey: subKey,
cacheService: &redis.Cache{Client: client},
errorTitle: "",
newStatus: 0,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.mocks()
if err := processErrorWithSavingOutput(tt.args.err, tt.args.errorOutput, tt.args.pipelineId, tt.args.subKey, tt.args.cacheService, tt.args.errorTitle, tt.args.newStatus); (err != nil) != tt.wantErr {
t.Errorf("processErrorWithSavingOutput() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func Test_processRunError(t *testing.T) {
client, mock := redismock.NewClientMock()
pipelineId := uuid.New()
errorMessage := "MOCK_ERROR"
subKey := cache.RunError
errorChannel := make(chan error, 1)
errorChannel <- fmt.Errorf(errorMessage)
type args struct {
ctx context.Context
errorChannel chan error
errorOutput []byte
pipelineId uuid.UUID
cacheService cache.Cache
stopReadLogsChannel chan bool
finishReadLogsChannel chan bool
}
tests := []struct {
name string
mocks func()
args args
wantErr bool
}{
{
name: "Error during HSet operation",
mocks: func() {
mock.ExpectHSet(pipelineId.String(), subKey).SetErr(fmt.Errorf(errorMessage))
},
args: args{
ctx: context.Background(),
errorChannel: errorChannel,
errorOutput: nil,
pipelineId: pipelineId,
cacheService: &redis.Cache{Client: client},
stopReadLogsChannel: nil,
finishReadLogsChannel: nil,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.mocks()
if err := processRunError(tt.args.errorChannel, tt.args.errorOutput, tt.args.pipelineId, tt.args.cacheService, tt.args.stopReadLogsChannel, tt.args.finishReadLogsChannel); (err != nil) != tt.wantErr {
t.Errorf("processRunError() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func Test_processCompileSuccess(t *testing.T) {
client, mock := redismock.NewClientMock()
pipelineId := uuid.New()
output := "output"
cacheMock := &redis.Cache{Client: client}
marshalLogs, _ := json.Marshal(cache.Logs)
marshalCompileOutput, _ := json.Marshal(cache.CompileOutput)
marshalRunOutput, _ := json.Marshal(cache.RunOutput)
marshalRunError, _ := json.Marshal(cache.RunError)
outputMarshal, _ := json.Marshal(output)
marshalEmptyString, _ := json.Marshal("")
type args struct {
ctx context.Context
output []byte
pipelineId uuid.UUID
cacheService cache.Cache
}
tests := []struct {
name string
mocks func()
args args
wantErr bool
}{
{
name: "Error during set value to CompileOutput subKey",
mocks: func() {
},
args: args{
ctx: context.Background(),
output: []byte(output),
pipelineId: pipelineId,
cacheService: cacheMock,
},
wantErr: true,
},
{
name: "Error during set value to RunOutput subKey",
mocks: func() {
mock.ExpectHSet(pipelineId.String(), marshalCompileOutput, outputMarshal).SetVal(1)
},
args: args{
ctx: context.Background(),
output: []byte(output),
pipelineId: pipelineId,
cacheService: cacheMock,
},
wantErr: true,
},
{
name: "Error during set value to RunError subKey",
mocks: func() {
mock.ExpectHSet(pipelineId.String(), marshalCompileOutput, outputMarshal).SetVal(1)
mock.ExpectHSet(pipelineId.String(), marshalRunOutput, marshalEmptyString).SetVal(1)
},
args: args{
ctx: context.Background(),
output: []byte(output),
pipelineId: pipelineId,
cacheService: cacheMock,
},
wantErr: true,
},
{
name: "Error during set value to Logs subKey",
mocks: func() {
mock.ExpectHSet(pipelineId.String(), marshalCompileOutput, outputMarshal).SetVal(1)
mock.ExpectHSet(pipelineId.String(), marshalRunOutput, marshalEmptyString).SetVal(1)
mock.ExpectHSet(pipelineId.String(), marshalRunError, marshalEmptyString).SetVal(1)
},
args: args{
ctx: context.Background(),
output: []byte(output),
pipelineId: pipelineId,
cacheService: cacheMock,
},
wantErr: true,
},
{
name: "Error during set value to Graph subKey",
mocks: func() {
mock.ExpectHSet(pipelineId.String(), marshalCompileOutput, outputMarshal).SetVal(1)
mock.ExpectHSet(pipelineId.String(), marshalRunOutput, marshalEmptyString).SetVal(1)
mock.ExpectHSet(pipelineId.String(), marshalRunError, marshalEmptyString).SetVal(1)
mock.ExpectHSet(pipelineId.String(), marshalLogs, marshalEmptyString).SetVal(1)
},
args: args{
ctx: context.Background(),
output: []byte(output),
pipelineId: pipelineId,
cacheService: cacheMock,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.mocks()
if err := processCompileSuccess(tt.args.output, tt.args.pipelineId, tt.args.cacheService); (err != nil) != tt.wantErr {
t.Errorf("processCompileSuccess() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func Test_readGraphFile(t *testing.T) {
pipelineLifeCycleCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
type args struct {
pipelineLifeCycleCtx context.Context
backgroundCtx context.Context
cacheService cache.Cache
graphFilePath string
pipelineId uuid.UUID
}
tests := []struct {
name string
args args
}{
{
name: "Successfully saving the prepared graph to the cache",
args: args{
pipelineLifeCycleCtx: pipelineLifeCycleCtx,
backgroundCtx: context.Background(),
cacheService: cacheService,
graphFilePath: graphFilePath,
pipelineId: uuid.New(),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
readGraphFile(tt.args.pipelineLifeCycleCtx, tt.args.cacheService, tt.args.graphFilePath, tt.args.pipelineId)
if v, _ := cacheService.GetValue(tt.args.backgroundCtx, tt.args.pipelineId, cache.Graph); v == nil {
t.Errorf("readGraphFile() error: the graph was not cached")
}
})
}
}
func Test_cancelCheck(t *testing.T) {
type args struct {
timeout time.Duration
cancelWaitTime time.Duration
setCancel bool
}
tests := []struct {
name string
args args
}{
{
name: "Successfully canceling the pipeline immediately",
args: args{
timeout: 5 * time.Minute,
cancelWaitTime: 0,
setCancel: true,
},
},
{
name: "Successfully canceling the pipeline after timeout",
args: args{
timeout: 2 * pauseDuration,
cancelWaitTime: 3 * pauseDuration,
setCancel: true,
},
},
{
name: "Successfully canceling the pipeline after timeout (immediate timeout)",
args: args{
timeout: 0,
cancelWaitTime: 3 * pauseDuration,
setCancel: true,
},
},
{
name: "Successfully timing out the pipeline without cancel",
args: args{
timeout: 3 * pauseDuration,
cancelWaitTime: 0,
setCancel: false,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), tt.args.timeout)
defer cancel()
id := uuid.New()
localCache := local.New(ctx)
go cancelCheck(ctx, id, cancel, localCache)
if tt.args.setCancel {
_ = localCache.SetValue(ctx, id, cache.Canceled, true)
}
// Wait some time for the cancelCheck to be executed
time.Sleep(5 * pauseDuration)
if err := ctx.Err(); err == nil {
t.Errorf("cancelCheck() error expected, err = %v", err)
}
})
}
}