blob: b1fc191ceb35ac20fe3937bef7d00e31da15cc0d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package code_processing
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"reflect"
"sync"
"time"
"github.com/google/uuid"
pb "beam.apache.org/playground/backend/internal/api/v1"
"beam.apache.org/playground/backend/internal/cache"
"beam.apache.org/playground/backend/internal/environment"
perrors "beam.apache.org/playground/backend/internal/errors"
"beam.apache.org/playground/backend/internal/executors"
"beam.apache.org/playground/backend/internal/fs_tool"
"beam.apache.org/playground/backend/internal/logger"
"beam.apache.org/playground/backend/internal/setup_tools/builder"
"beam.apache.org/playground/backend/internal/streaming"
"beam.apache.org/playground/backend/internal/utils"
"beam.apache.org/playground/backend/internal/validators"
)
const (
pauseDuration = 500 * time.Millisecond
)
// Process validates, compiles and runs code by pipelineId.
// During each operation updates status of execution and saves it into cache:
// - In case of processing works more that timeout duration saves playground.Status_STATUS_RUN_TIMEOUT as cache.Status into cache.
// - In case of code processing has been canceled saves playground.Status_STATUS_CANCELED as cache.Status into cache.
// - In case of validation step is failed saves playground.Status_STATUS_VALIDATION_ERROR as cache.Status into cache.
// - In case of compile step is failed saves playground.Status_STATUS_COMPILE_ERROR as cache.Status and compile logs as cache.CompileOutput into cache.
// - In case of compile step is completed with no errors saves compile output as cache.CompileOutput into cache.
// - In case of run step is failed saves playground.Status_STATUS_RUN_ERROR as cache.Status and run logs as cache.RunError into cache.
// - In case of run step is completed with no errors saves playground.Status_STATUS_FINISHED as cache.Status and run output as cache.RunOutput into cache.
// At the end of this method deletes all created folders.
func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycle, pipelineId uuid.UUID, appEnv *environment.ApplicationEnvs, sdkEnv *environment.BeamEnvs, pipelineOptions string) {
pipelineLifeCycleCtx, finishCtxFunc := context.WithTimeout(ctx, appEnv.PipelineExecuteTimeout())
defer func(lc *fs_tool.LifeCycle) {
finishCtxFunc()
DeleteResources(pipelineId, lc)
}(lc)
var validationResults sync.Map
go cancelCheck(pipelineLifeCycleCtx, pipelineId, finishCtxFunc, cacheService)
err := validateStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, &validationResults)
if err != nil {
var pipelineCanceledError perrors.PipelineCanceledError
if errors.As(err, &pipelineCanceledError) {
logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error())
} else {
logger.Errorf("%s: error during validation step: %s", pipelineId, err.Error())
}
return
}
err = prepareStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, &validationResults, lc.GetPreparerParameters())
if err != nil {
var pipelineCanceledError perrors.PipelineCanceledError
if errors.As(err, &pipelineCanceledError) {
logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error())
} else {
logger.Errorf("%s: error during preparation step: %s", pipelineId, err.Error())
}
return
}
// Check if is unit test
validateIsUnitTest, _ := validationResults.Load(validators.UnitTestValidatorName)
isUnitTest := validateIsUnitTest.(bool)
err = compileStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, isUnitTest)
if err != nil {
var pipelineCanceledError perrors.PipelineCanceledError
var compilationError perrors.CompilationError
if errors.As(err, &pipelineCanceledError) {
logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error())
} else if errors.As(err, &compilationError) {
logger.Warnf("%s: compilation error: %s", pipelineId, compilationError.Error())
} else {
logger.Errorf("%s: error during compilation step: %s", pipelineId, err.Error())
}
return
}
// Run/RunTest
err = runStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, isUnitTest, sdkEnv, pipelineOptions)
if err != nil {
var pipelineCanceledError perrors.PipelineCanceledError
var runError perrors.RunError
if errors.As(err, &pipelineCanceledError) {
logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error())
} else if errors.As(err, &runError) {
logger.Warnf("%s: pipeline execution ended with error from child process: %s", runError.Error())
} else {
logger.Errorf("%s: error during run step: %s", pipelineId, err.Error())
}
return
}
}
func runStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, isUnitTest bool, sdkEnv *environment.BeamEnvs, pipelineOptions string) error {
errorChannel, successChannel := createStatusChannels()
stopReadLogsChannel := make(chan bool, 1)
finishReadLogsChannel := make(chan bool, 1)
var executorBuilder *executors.ExecutorBuilder
err := error(nil)
if isUnitTest {
executorBuilder, err = builder.TestRunner(ctx, paths, sdkEnv)
} else {
executorBuilder, err = builder.Runner(ctx, paths, utils.ReduceWhiteSpacesToSinge(pipelineOptions), sdkEnv)
}
if err != nil {
if processingErr := processSetupError(err, pipelineId, cacheService); processingErr != nil {
return processingErr
}
return err
}
executor := executorBuilder.Build()
logger.Infof("%s: Run()/Test() ...\n", pipelineId)
runCmd := getExecuteCmd(isUnitTest, &executor, ctx)
var runError bytes.Buffer
runOutput := streaming.RunOutputWriter{Ctx: ctx, CacheService: cacheService, PipelineId: pipelineId}
go readLogFile(ctx, cacheService, paths.AbsoluteLogFilePath, pipelineId, stopReadLogsChannel, finishReadLogsChannel)
go readGraphFile(ctx, cacheService, paths.AbsoluteGraphFilePath, pipelineId)
if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
// For go SDK all logs are placed to stdErr.
file, err := os.Create(paths.AbsoluteLogFilePath)
if err != nil {
// If some error with creating a log file do the same as with other SDK.
logger.Errorf("%s: error during create log file (go sdk): %s", pipelineId, err.Error())
runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
} else {
// Use the log file to write all stdErr into it.
runCmdWithOutput(runCmd, &runOutput, file, successChannel, errorChannel)
}
} else {
// Other SDKs write logs to the log file on their own.
runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
}
// Start of the monitoring of background tasks (run step/cancellation/timeout)
ok, err := reconcileBackgroundTask(ctx, pipelineId, cacheService, successChannel)
if err != nil {
return err
}
if !ok {
// If unit test has some error then error output is placed as RunOutput
if isUnitTest {
output, err := GetProcessingOutput(ctx, cacheService, pipelineId, cache.RunOutput, "")
if err == nil {
runError.Write([]byte(output))
}
}
// Run step is finished, but code contains some error (divide by 0 for example)
if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
// For Go SDK stdErr was redirected to the log file.
errData, err := os.ReadFile(paths.AbsoluteLogFilePath)
if err != nil {
logger.Errorf("%s: error during read errors from log file (go sdk): %s", pipelineId, err.Error())
}
runError.Write(errData)
}
processingErr := processRunError(errorChannel, runError.Bytes(), pipelineId, cacheService, stopReadLogsChannel, finishReadLogsChannel)
if processingErr != nil {
return processingErr
}
return perrors.RunError{Log: runError}
}
// Run step is finished and code is executed
err = processRunSuccess(pipelineId, cacheService, stopReadLogsChannel, finishReadLogsChannel)
if err != nil {
return err
}
return nil
}
func compileStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, isUnitTest bool) error {
errorChannel, successChannel := createStatusChannels()
// This condition is used for cases when the playground doesn't compile source files. For the Python code and the Go Unit Tests
if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_PYTHON || sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_SCIO || (sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO && isUnitTest) {
if err := processCompileSuccess([]byte(""), pipelineId, cacheService); err != nil {
return err
}
} else { // in case of Java, Go (not unit test), Scala - need compile step
executorBuilder, err := builder.Compiler(paths, sdkEnv)
if err != nil {
logger.Errorf("compileStep(): failed creating ExecutorBuilder = %v", executorBuilder)
return nil
}
executor := executorBuilder.Build()
logger.Infof("%s: Compile() ...\n", pipelineId)
compileCmd := executor.Compile(ctx)
var compileError bytes.Buffer
var compileOutput bytes.Buffer
runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)
// Start of the monitoring of background tasks (compile step/cancellation/timeout)
ok, err := reconcileBackgroundTask(ctx, pipelineId, cacheService, successChannel)
if err != nil {
return err
}
if !ok { // Compile step is finished, but code couldn't be compiled (some typos for example)
err := <-errorChannel
processingErr := processErrorWithSavingOutput(err, compileError.Bytes(), pipelineId, cache.CompileOutput, cacheService, "Compile", pb.Status_STATUS_COMPILE_ERROR)
if processingErr != nil {
return processingErr
}
return perrors.CompilationError{Reason: err.Error()}
} // Compile step is finished and code is compiled
if err := processCompileSuccess(compileOutput.Bytes(), pipelineId, cacheService); err != nil {
return err
}
}
return nil
}
func prepareStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, validationResults *sync.Map, prepareParams map[string]string) error {
errorChannel, successChannel := createStatusChannels()
executorBuilder, err := builder.Preparer(paths, sdkEnv, validationResults, prepareParams)
if err != nil {
if processingErr := processSetupError(err, pipelineId, cacheService); processingErr != nil {
return processingErr
}
return err
}
executor := executorBuilder.Build()
logger.Infof("%s: Prepare() ...\n", pipelineId)
prepareFunc := executor.Prepare()
go prepareFunc(successChannel, errorChannel, validationResults)
// Start of the monitoring of background tasks (prepare function/cancellation/timeout)
ok, err := reconcileBackgroundTask(ctx, pipelineId, cacheService, successChannel)
if err != nil {
return err
}
if !ok {
err := <-errorChannel
// Prepare step is finished, but code couldn't be prepared (some error during prepare step)
processingErr := processErrorWithSavingOutput(err, []byte(err.Error()), pipelineId, cache.PreparationOutput, cacheService, "Prepare", pb.Status_STATUS_PREPARATION_ERROR)
if processingErr != nil {
return processingErr
}
return err
}
// Prepare step is finished and code is prepared
if err := processSuccess(pipelineId, cacheService, "Prepare", pb.Status_STATUS_COMPILING); err != nil {
return err
}
return nil
}
func validateStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, validationResults *sync.Map) error {
errorChannel, successChannel := createStatusChannels()
executorBuilder, err := builder.Validator(paths, sdkEnv)
if err != nil {
if processingError := processSetupError(err, pipelineId, cacheService); processingError != nil {
return processingError
}
return err
}
executor := executorBuilder.Build()
logger.Infof("%s: Validate() ...\n", pipelineId)
validateFunc := executor.Validate()
go validateFunc(successChannel, errorChannel, validationResults)
// Start of the monitoring of background tasks (validate function/cancellation/timeout)
ok, err := reconcileBackgroundTask(ctx, pipelineId, cacheService, successChannel)
if err != nil {
return err
}
if !ok {
err := <-errorChannel
// Validate step is finished, but code isn't valid
processingErr := processErrorWithSavingOutput(err, []byte(err.Error()), pipelineId, cache.ValidationOutput, cacheService, "Validate", pb.Status_STATUS_VALIDATION_ERROR)
if processingErr != nil {
return processingErr
}
return err
}
// Validate step is finished and code is valid
if err := processSuccess(pipelineId, cacheService, "Validate", pb.Status_STATUS_PREPARING); err != nil {
return err
}
return err
}
func createStatusChannels() (chan error, chan bool) {
errorChannel := make(chan error, 1)
successChannel := make(chan bool, 1)
return errorChannel, successChannel
}
// getExecuteCmd return cmd instance based on the code type: unit test or example code
func getExecuteCmd(isUnitTest bool, executor *executors.Executor, ctxWithTimeout context.Context) *exec.Cmd {
runType := executors.Run
if isUnitTest {
runType = executors.Test
}
cmdReflect := reflect.ValueOf(executor).MethodByName(string(runType)).Call([]reflect.Value{reflect.ValueOf(ctxWithTimeout)})
return cmdReflect[0].Interface().(*exec.Cmd)
}
// processSetupError processes errors during the setting up an executor builder
func processSetupError(err error, pipelineId uuid.UUID, cacheService cache.Cache) error {
logger.Errorf("%s: error during setup builder: %s\n", pipelineId, err.Error())
if err = utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_ERROR); err != nil {
logger.Errorf("%s: error during saving error message: %s", pipelineId, err)
return err
}
return nil
}
// GetProcessingOutput gets processing output value from cache by key and subKey.
// In case key doesn't exist in cache - returns an errors.NotFoundError.
// In case subKey doesn't exist in cache for the key - returns an errors.NotFoundError.
// In case value from cache by key and subKey couldn't be converted to string - returns an errors.InternalError.
func GetProcessingOutput(ctx context.Context, cacheService cache.Cache, key uuid.UUID, subKey cache.SubKey, errorTitle string) (string, error) {
value, err := cacheService.GetValue(ctx, key, subKey)
if err != nil {
logger.Errorf("%s: GetProcessingOutput(): cache.GetValue: error: %s", key, err.Error())
return "", perrors.NotFoundError(errorTitle, "Error during getting output")
}
stringValue, converted := value.(string)
if !converted {
logger.Errorf("%s: couldn't convert value to string: %s", key, value)
return "", perrors.InternalError(errorTitle, "Error during getting output")
}
return stringValue, nil
}
// GetProcessingStatus gets processing status from cache by key.
// In case key doesn't exist in cache - returns an errors.NotFoundError.
// In case value from cache by key and subKey couldn't be converted to playground.Status - returns an errors.InternalError.
func GetProcessingStatus(ctx context.Context, cacheService cache.Cache, key uuid.UUID, errorTitle string) (pb.Status, error) {
value, err := cacheService.GetValue(ctx, key, cache.Status)
if err != nil {
logger.Errorf("%s: GetProcessingStatus(): cache.GetValue: error: %s", key, err.Error())
return pb.Status_STATUS_UNSPECIFIED, perrors.NotFoundError(errorTitle, "Error during getting status")
}
statusValue, converted := value.(pb.Status)
if !converted {
logger.Errorf("%s: couldn't convert value to correct status enum: %s", key, value)
return pb.Status_STATUS_UNSPECIFIED, perrors.InternalError(errorTitle, "Error during getting status")
}
return statusValue, nil
}
// GetLastIndex gets last index for run output or logs from cache by key.
// In case key doesn't exist in cache - returns an errors.NotFoundError.
// In case value from cache by key and subKey couldn't be converted to int - returns an errors.InternalError.
func GetLastIndex(ctx context.Context, cacheService cache.Cache, key uuid.UUID, subKey cache.SubKey, errorTitle string) (int, error) {
value, err := cacheService.GetValue(ctx, key, subKey)
if err != nil {
logger.Errorf("%s: GetLastIndex(): cache.GetValue: error: %s", key, err.Error())
return 0, perrors.NotFoundError(errorTitle, "Error during getting pagination value")
}
convertedValue, converted := value.(float64)
if !converted {
logger.Errorf("%s: couldn't convert value to float64. value: %s type %s", key, value, reflect.TypeOf(value))
return 0, perrors.InternalError(errorTitle, "Error during getting pagination value")
}
return int(convertedValue), nil
}
// GetGraph gets graph from cache by key.
// In case key doesn't exist in cache - returns an errors.NotFoundError.
// In case value from cache by key couldn't be converted to []byte - returns an errors.InternalError.
func GetGraph(ctx context.Context, cacheService cache.Cache, key uuid.UUID, errorTitle string) (string, error) {
value, err := cacheService.GetValue(ctx, key, cache.Graph)
if err != nil {
logger.Errorf("%s: GetGraph(): cache.GetValue: error: %s", key, err.Error())
return "", perrors.NotFoundError(errorTitle, "Error during getting graph")
}
stringValue, converted := value.(string)
if !converted {
logger.Errorf("%s: couldn't convert value to string. value: %s type %s", key, value, reflect.TypeOf(value))
return "", perrors.InternalError(errorTitle, "Error during getting graph")
}
return stringValue, nil
}
// runCmdWithOutput runs command with keeping stdOut and stdErr
func runCmdWithOutput(cmd *exec.Cmd, stdOutput io.Writer, stdError io.Writer, successChannel chan bool, errorChannel chan error) {
cmd.Stdout = stdOutput
cmd.Stderr = stdError
go func(cmd *exec.Cmd, successChannel chan bool, errChannel chan error) {
err := cmd.Run()
if err != nil {
errChannel <- err
successChannel <- false
} else {
successChannel <- true
}
}(cmd, successChannel, errorChannel)
}
// reconcileBackgroundTask waits when first background task finishes.
// If finishes by canceling, timeout or context is done - returns error.
// If cmd operation (Validate/Prepare/Compile/Run/RunTest) finishes successfully with no error
//
// during step processing - returns true.
//
// If cmd operation (Validate/Prepare/Compile/Run/RunTest) finishes successfully but with some error
//
// during step processing - returns false.
func reconcileBackgroundTask(pipelineLifeCycleCtx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, successChannel chan bool) (bool, error) {
select {
case <-pipelineLifeCycleCtx.Done():
contextErr := pipelineLifeCycleCtx.Err()
switch contextErr {
case context.DeadlineExceeded:
if err := finishByTimeout(pipelineId, cacheService); err != nil {
return false, fmt.Errorf("error during context timeout processing: %s", err.Error())
}
return false, perrors.PipelineCanceledError{Reason: fmt.Sprintf("code processing context timeout")}
case context.Canceled:
if err := processCancel(cacheService, pipelineId); err != nil {
return false, fmt.Errorf("error during cancellation processing: %s", err.Error())
}
return false, perrors.PipelineCanceledError{Reason: "code processing was canceled"}
default:
return false, fmt.Errorf("code processing cancelled due to unexpected reason: %s", contextErr.Error())
}
case ok := <-successChannel:
return ok, nil
}
}
// cancelCheck checks cancel flag for code processing.
// If cancel flag doesn't exist in cache continue working.
// If context is done it means that the code processing was finished (successfully/with error/timeout). Return.
// If cancel flag exists, and it is true it means that the code processing was canceled. Set true to cancelChannel and return.
func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelFunc context.CancelFunc, cacheService cache.Cache) {
ticker := time.NewTicker(pauseDuration)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Use background context for the cache operation to avoid failure when the main context is timed out.
// Timeouts are handled by the Redis client internally.
cancel, err := cacheService.GetValue(context.Background(), pipelineId, cache.Canceled)
// Only cancel if got the boolean value from the cache. Do not cancel if the value is nil or error happened.
if err != nil {
logger.Errorf("%s: Error during getting value from the cache: %s", pipelineId, err.Error())
} else if cancel != nil && cancel.(bool) {
cancelFunc()
return
}
}
}
}
// readGraphFile reads graph from the file and keeps it to the cache.
// If context is done it means that the code processing was finished (successfully/with error/timeout).
// Write graph to the cache if this in the file.
// In other case each pauseDuration checks that graph file exists or not and try to save it to the cache.
func readGraphFile(pipelineLifeCycleCtx context.Context, cacheService cache.Cache, graphFilePath string, pipelineId uuid.UUID) {
ticker := time.NewTicker(pauseDuration)
defer ticker.Stop()
for {
select {
// waiting when graph file appears
case <-ticker.C:
if _, err := os.Stat(graphFilePath); err == nil {
graph, err := os.ReadFile(graphFilePath)
if err != nil {
logger.Errorf("%s: Error during saving graph to the file: %s", pipelineId, err.Error())
}
_ = utils.SetToCache(cacheService, pipelineId, cache.Graph, string(graph))
}
// in case of timeout or cancel
case <-pipelineLifeCycleCtx.Done():
if _, err := os.Stat(graphFilePath); err == nil {
graph, err := os.ReadFile(graphFilePath)
if err != nil {
logger.Errorf("%s: Error during saving graph to the file: %s", pipelineId, err.Error())
}
_ = utils.SetToCache(cacheService, pipelineId, cache.Graph, string(graph))
}
return
}
}
}
// readLogFile reads logs from the log file and keeps it to the cache.
// If context is done it means that the code processing was finished (successfully/with error/timeout). Write last logs to the cache.
// If <-stopReadLogsChannel it means that the code processing was finished (canceled/timeout)
//
// and it waits until the method stops the work to change status to the pb.Status_STATUS_FINISHED. Write last logs
// to the cache and set value to the finishReadLogChannel channel to unblock the code processing.
//
// In other case each pauseDuration write to cache logs of the code processing.
func readLogFile(pipelineLifeCycleCtx context.Context, cacheService cache.Cache, logFilePath string, pipelineId uuid.UUID, stopReadLogsChannel, finishReadLogChannel chan bool) {
ticker := time.NewTicker(pauseDuration)
defer ticker.Stop()
for {
select {
// in case of timeout or cancel
case <-pipelineLifeCycleCtx.Done():
_ = finishReadLogFile(ticker, cacheService, logFilePath, pipelineId)
return
// in case of pipeline finish successfully or has error on the run step
case <-stopReadLogsChannel:
_ = finishReadLogFile(ticker, cacheService, logFilePath, pipelineId)
finishReadLogChannel <- true
return
case <-ticker.C:
_ = writeLogsToCache(cacheService, logFilePath, pipelineId)
}
}
}
// finishReadLogFile is used to read logs file for the last time
func finishReadLogFile(ticker *time.Ticker, cacheService cache.Cache, logFilePath string, pipelineId uuid.UUID) error {
ticker.Stop()
return writeLogsToCache(cacheService, logFilePath, pipelineId)
}
// writeLogsToCache write all logs from the log file to the cache.
// If log file doesn't exist, return nil.
//
// Reading logs works as a parallel with code processing so when program tries to read file
// it could be that the file doesn't exist yet.
//
// If log file exists, read all from the log file and keep it to the cache using cache.Logs subKey.
// If some error occurs, log the error and return the error.
func writeLogsToCache(cacheService cache.Cache, logFilePath string, pipelineId uuid.UUID) error {
if _, err := os.Stat(logFilePath); os.IsNotExist(err) {
return nil
}
logs, err := os.ReadFile(logFilePath)
if err != nil {
logger.Errorf("%s: writeLogsToCache(): error during read from logs file: %s", pipelineId, err.Error())
return err
}
return utils.SetToCache(cacheService, pipelineId, cache.Logs, string(logs))
}
// DeleteResources removes all prepared resources for received LifeCycle
func DeleteResources(pipelineId uuid.UUID, lc *fs_tool.LifeCycle) {
logger.Infof("%s: DeleteResources() ...\n", pipelineId)
if err := lc.DeleteFolders(); err != nil {
logger.Error("%s: DeleteResources(): %s\n", pipelineId, err.Error())
}
lc.StopEmulators()
logger.Infof("%s: DeleteResources() complete\n", pipelineId)
logger.Infof("%s: complete\n", pipelineId)
}
// finishByTimeout is used in case of runCode method finished by timeout
func finishByTimeout(pipelineId uuid.UUID, cacheService cache.Cache) error {
logger.Warnf("%s: code processing finishes because of timeout\n", pipelineId)
// set to cache pipelineId: cache.SubKey_Status: Status_STATUS_RUN_TIMEOUT
return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
}
// processErrorWithSavingOutput processes error with saving to cache received error output.
func processErrorWithSavingOutput(err error, errorOutput []byte, pipelineId uuid.UUID, subKey cache.SubKey, cacheService cache.Cache, errorTitle string, newStatus pb.Status) error {
logger.Warnf("%s: %s(): err: %s, output: %s\n", pipelineId, errorTitle, err.Error(), errorOutput)
if err := utils.SetToCache(cacheService, pipelineId, subKey, fmt.Sprintf("error: %s\noutput: %s", err.Error(), errorOutput)); err != nil {
logger.Errorf("%s: failed to save error message to cache: %s", pipelineId, err.Error())
return err
}
if err = utils.SetToCache(cacheService, pipelineId, cache.Status, newStatus); err != nil {
logger.Errorf("%s: failed to save status to cache: %s", pipelineId, err.Error())
return err
}
return nil
}
// processRunError processes error received during processing run step.
// This method sets error output to the cache and after that sets value to channel to stop goroutine which writes logs.
//
// After receiving a signal that goroutine was finished (read value from finishReadLogsChannel) this method
// sets corresponding status to the cache.
func processRunError(errorChannel chan error, errorOutput []byte, pipelineId uuid.UUID, cacheService cache.Cache, stopReadLogsChannel, finishReadLogsChannel chan bool) error {
err := <-errorChannel
logger.Warnf("%s: Run(): err: %s, output: %s\n", pipelineId, err.Error(), errorOutput)
if err := utils.SetToCache(cacheService, pipelineId, cache.RunError, fmt.Sprintf("error: %s\noutput: %s", err.Error(), string(errorOutput))); err != nil {
return err
}
stopReadLogsChannel <- true
<-finishReadLogsChannel
return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_ERROR)
}
// processSuccess processes case after successful process validation or preparation steps.
// This method sets corresponding status to the cache.
func processSuccess(pipelineId uuid.UUID, cacheService cache.Cache, successTitle string, newStatus pb.Status) error {
logger.Infof("%s: %s(): finish\n", pipelineId, successTitle)
return utils.SetToCache(cacheService, pipelineId, cache.Status, newStatus)
}
// processCompileSuccess processes case after successful compile step.
// This method sets output of the compile step, sets empty string as output of the run step and
//
// sets corresponding status to the cache.
func processCompileSuccess(output []byte, pipelineId uuid.UUID, cacheService cache.Cache) error {
logger.Infof("%s: Compile() finish\n", pipelineId)
if err := utils.SetToCache(cacheService, pipelineId, cache.CompileOutput, string(output)); err != nil {
return err
}
if err := utils.SetToCache(cacheService, pipelineId, cache.RunOutput, ""); err != nil {
return err
}
if err := utils.SetToCache(cacheService, pipelineId, cache.RunError, ""); err != nil {
return err
}
if err := utils.SetToCache(cacheService, pipelineId, cache.Logs, ""); err != nil {
return err
}
if err := utils.SetToCache(cacheService, pipelineId, cache.Graph, ""); err != nil {
return err
}
return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_EXECUTING)
}
// processRunSuccess processes case after successful run step.
// This method sets value to channel to stop goroutine which writes logs.
//
// After receiving a signal that goroutine was finished (read value from finishReadLogsChannel) this method
// sets corresponding status to the cache.
func processRunSuccess(pipelineId uuid.UUID, cacheService cache.Cache, stopReadLogsChannel, finishReadLogsChannel chan bool) error {
logger.Infof("%s: Run() finish\n", pipelineId)
stopReadLogsChannel <- true
<-finishReadLogsChannel
return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_FINISHED)
}
// processCancel process case when code processing was canceled
func processCancel(cacheService cache.Cache, pipelineId uuid.UUID) error {
logger.Infof("%s: was canceled\n", pipelineId)
// set to cache pipelineId: cache.SubKey_Status: pb.Status_STATUS_CANCELED
return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_CANCELED)
}