blob: ede219294d76d33618eaee4905c69654f412d115 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package life_cycle
import (
"beam.apache.org/playground/backend/internal/emulators"
"bufio"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"github.com/google/uuid"
pb "beam.apache.org/playground/backend/internal/api/v1"
"beam.apache.org/playground/backend/internal/db/entity"
"beam.apache.org/playground/backend/internal/fs_tool"
"beam.apache.org/playground/backend/internal/logger"
utils "beam.apache.org/playground/backend/internal/utils"
)
const (
javaLogConfigFileName = "logging.properties"
javaTmpLogConfigFile = "logging_updated.properties"
javaLogFilePlaceholder = "{logFilePath}"
goModFileName = "go.mod"
goSumFileName = "go.sum"
bashCmd = "bash"
scioProjectName = "scio"
scioProjectPath = scioProjectName + "/src/main/scala/" + scioProjectName
logFileName = "logs.log"
defaultExampleInSbt = "WordCount.scala"
scioProject = "new_scio_project.sh"
scioCommonConstants = "ExampleData.scala"
)
// Setup returns fs_tool.LifeCycle.
// Also, prepares files and folders needed to code processing according to sdk
func Setup(sdk pb.Sdk, sources []entity.FileEntity, pipelineId uuid.UUID, workingDir, pipelinesFolder, preparedModDir string, emulatorConfiguration emulators.EmulatorConfiguration) (*fs_tool.LifeCycle, error) {
// create file system service
lc, err := fs_tool.NewLifeCycle(sdk, pipelineId, filepath.Join(workingDir, pipelinesFolder))
if err != nil {
logger.Errorf("%s: error during create new life cycle: %s\n", pipelineId, err.Error())
return nil, errors.New("error during create a new file system")
}
// create folders
err = lc.CreateFolders()
if err != nil {
logger.Errorf("%s: error during create folders: %s\n", pipelineId, err.Error())
return nil, errors.New("error during prepare necessary folders")
}
// copy necessary files
switch sdk {
case pb.Sdk_SDK_GO:
if err = prepareGoFiles(lc, preparedModDir, pipelineId); err != nil {
errDelete := lc.DeleteFolders()
if errDelete != nil {
return nil, fmt.Errorf("error during cleanup handling error when creating necessary files for Go sdk: %s, cleanup error: %s", err.Error(), errDelete.Error())
}
return nil, fmt.Errorf("error during create necessary files for the Go sdk: %s", err.Error())
}
case pb.Sdk_SDK_JAVA:
if err = prepareJavaFiles(lc, workingDir, pipelineId); err != nil {
errDelete := lc.DeleteFolders()
if errDelete != nil {
return nil, fmt.Errorf("error during cleanup handling error when creating necessary files for Java sdk: %s, cleanup error: %s", err.Error(), errDelete.Error())
}
return nil, fmt.Errorf("error during create necessary files for the Java sdk: %s", err.Error())
}
case pb.Sdk_SDK_SCIO:
if lc, err = prepareSbtFiles(lc, lc.Paths.AbsoluteBaseFolderPath, workingDir); err != nil {
errDelete := lc.DeleteFolders()
if errDelete != nil {
return nil, fmt.Errorf("error during cleanup handling error when creating necessary files for Scio sdk: %s, cleanup error: %s", err.Error(), errDelete.Error())
}
return nil, fmt.Errorf("error during create necessary files for the Scio sdk: %s", err.Error())
}
}
// create file with code
err = lc.CreateSourceCodeFiles(sources)
if err != nil {
logger.Errorf("%s: RunCode(): CreateSourceCodeFile(): %s\n", pipelineId, err.Error())
errDelete := lc.DeleteFolders()
if errDelete != nil {
return nil, fmt.Errorf("error during cleaning up when handling error %s, cleanup error: %s", err.Error(), errDelete.Error())
}
return nil, errors.New("error during create file with code")
}
// start emulators
if emulatorConfiguration.KafkaEmulatorExecutablePath != "" {
err = lc.StartEmulators(emulatorConfiguration)
if err != nil {
logger.Errorf("error during starting emulators: %s", err.Error())
errDelete := lc.DeleteFolders()
if errDelete != nil {
return nil, fmt.Errorf("error during cleaning up when handling error %s, cleanup error: %s", err.Error(), errDelete.Error())
}
lc.StopEmulators()
return nil, err
}
} else {
logger.Warnf("kafka emulator executable path is empty, emulators will not be started")
}
return lc, nil
}
// prepareGoFiles prepares file for Go environment.
// Copy go.mod and go.sum file from /path/to/preparedModDir to /path/to/workingDir/pipelinesFolder/{pipelineId}
func prepareGoFiles(lc *fs_tool.LifeCycle, preparedModDir string, pipelineId uuid.UUID) error {
if err := utils.CopyFilePreservingName(goModFileName, preparedModDir, lc.Paths.AbsoluteBaseFolderPath); err != nil {
logger.Errorf("%s: error during copying %s file: %s\n", pipelineId, goModFileName, err.Error())
return err
}
if err := utils.CopyFilePreservingName(goSumFileName, preparedModDir, lc.Paths.AbsoluteBaseFolderPath); err != nil {
logger.Errorf("%s: error during copying %s file: %s\n", pipelineId, goSumFileName, err.Error())
return err
}
return nil
}
// prepareJavaFiles prepares file for Java environment.
// Copy log config file from /path/to/workingDir to /path/to/workingDir/pipelinesFolder/{pipelineId}
//
// and update this file according to pipeline.
func prepareJavaFiles(lc *fs_tool.LifeCycle, workingDir string, pipelineId uuid.UUID) error {
err := utils.CopyFilePreservingName(javaLogConfigFileName, workingDir, lc.Paths.AbsoluteBaseFolderPath)
if err != nil {
logger.Errorf("%s: error during copying logging.properties file: %s\n", pipelineId, err.Error())
return err
}
err = updateJavaLogConfigFile(lc.Paths)
if err != nil {
logger.Errorf("%s: error during updating logging.properties file: %s\n", pipelineId, err.Error())
return err
}
return nil
}
// updateJavaLogConfigFile updates java log config file according to pipeline
func updateJavaLogConfigFile(paths fs_tool.LifeCyclePaths) error {
logConfigFilePath := filepath.Join(paths.AbsoluteBaseFolderPath, javaLogConfigFileName)
logConfigUpdatedFilePath := filepath.Join(paths.AbsoluteBaseFolderPath, javaTmpLogConfigFile)
if _, err := os.Stat(logConfigFilePath); os.IsNotExist(err) {
return err
}
logConfigFile, err := os.Open(logConfigFilePath)
if err != nil {
return err
}
updatedFile, err := os.Create(logConfigUpdatedFilePath)
if err != nil {
return err
}
scanner := bufio.NewScanner(logConfigFile)
for scanner.Scan() {
line := scanner.Text()
line = strings.ReplaceAll(line, javaLogFilePlaceholder, paths.AbsoluteLogFilePath)
if _, err = io.WriteString(updatedFile, line+"\n"); err != nil {
return err
}
}
if err = scanner.Err(); err != nil {
return err
}
if err = os.Rename(updatedFile.Name(), logConfigFilePath); err != nil {
return err
}
return nil
}
func prepareSbtFiles(lc *fs_tool.LifeCycle, pipelineFolder string, workingDir string) (*fs_tool.LifeCycle, error) {
cmd := exec.Command(bashCmd, filepath.Join(workingDir, scioProject))
cmd.Dir = pipelineFolder
_, err := cmd.Output()
if err != nil {
return lc, err
}
sourceFileFolder := filepath.Join(pipelineFolder, scioProjectPath)
fileName := lc.Paths.SourceFileName
absFileFolderPath, _ := filepath.Abs(sourceFileFolder)
absFilePath, _ := filepath.Abs(filepath.Join(absFileFolderPath, fileName))
absLogFilePath, _ := filepath.Abs(filepath.Join(absFileFolderPath, logFileName))
absGraphFilePath, _ := filepath.Abs(filepath.Join(absFileFolderPath, utils.GraphFileName))
projectFolder, _ := filepath.Abs(filepath.Join(pipelineFolder, scioProjectName))
executableName := lc.Paths.FindExecutableName
err = os.Remove(filepath.Join(absFileFolderPath, defaultExampleInSbt))
if err != nil {
return lc, err
}
err = utils.CopyFilePreservingName(scioCommonConstants, workingDir, absFileFolderPath)
if err != nil {
return lc, err
}
lc.Paths = fs_tool.LifeCyclePaths{
SourceFileName: fileName,
AbsoluteSourceFileFolderPath: absFileFolderPath,
AbsoluteSourceFilePath: absFilePath,
ExecutableFileName: fileName,
AbsoluteExecutableFileFolderPath: absFileFolderPath,
AbsoluteExecutableFilePath: absFilePath,
AbsoluteBaseFolderPath: absFileFolderPath,
AbsoluteLogFilePath: absLogFilePath,
AbsoluteGraphFilePath: absGraphFilePath,
ProjectDir: projectFolder,
FindExecutableName: executableName,
}
return lc, nil
}