blob: 2f8f273b0a36b25d832cb3cf3705b7b5445f39c6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package environment
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"time"
pb "beam.apache.org/playground/backend/internal/api/v1"
"beam.apache.org/playground/backend/internal/logger"
)
const (
serverIpKey = "SERVER_IP"
serverPortKey = "SERVER_PORT"
beamSdkKey = "BEAM_SDK"
beamVersionKey = "BEAM_VERSION"
workingDirKey = "APP_WORK_DIR"
preparedModDirKey = "PREPARED_MOD_DIR"
numOfParallelJobsKey = "NUM_PARALLEL_JOBS"
cacheTypeKey = "CACHE_TYPE"
cacheAddressKey = "CACHE_ADDRESS"
beamPathKey = "BEAM_PATH"
cacheKeyExpirationTimeKey = "KEY_EXPIRATION_TIME"
pipelineExecuteTimeoutKey = "PIPELINE_EXPIRATION_TIMEOUT"
protocolTypeKey = "PROTOCOL_TYPE"
launchSiteKey = "LAUNCH_SITE"
projectIdKey = "GOOGLE_CLOUD_PROJECT"
pipelinesFolderKey = "PIPELINES_FOLDER_NAME"
defaultPipelinesFolder = "executable_files"
defaultLaunchSite = "local"
defaultProtocol = "HTTP"
defaultIp = "localhost"
defaultPort = 8080
defaultSdk = pb.Sdk_SDK_UNSPECIFIED
defaultBeamVersion = "<unknown>"
defaultBeamJarsPath = "/opt/apache/beam/jars/*"
defaultDatasetsPath = "/opt/playground/backend/datasets"
defaultKafkaEmulatorExecutablePath = "/opt/playground/backend/kafka-emulator/beam-playground-kafka-emulator.jar"
defaultCacheType = "local"
defaultCacheAddress = "localhost:6379"
defaultCacheKeyExpirationTime = time.Minute * 15
defaultPipelineExecuteTimeout = time.Minute * 10
jsonExt = ".json"
configFolderName = "configs"
defaultNumOfParallelJobs = 20
SDKConfigPathKey = "SDK_CONFIG"
defaultSDKConfigPath = "../sdks.yaml"
propertyPathKey = "PROPERTY_PATH"
datasetsPathKey = "DATASETS_PATH"
kafkaEmulatorExecutablePathKey = "KAFKA_EMULATOR_EXECUTABLE_PATH"
defaultPropertyPath = "."
cacheRequestTimeoutKey = "CACHE_REQUEST_TIMEOUT"
defaultCacheRequestTimeout = time.Second * 5
cleanupSnippetsFunctionsUrlKey = "CLEANUP_SNIPPETS_FUNCTIONS_URL"
defaultCleanupSnippetsFunctionsUrl = "http://cleanup_snippets:8080/"
putSnippetFunctionsUrlKey = "PUT_SNIPPET_FUNCTIONS_URL"
defaultPutSnippetFunctionsUrl = "http://put_snippet:8080/"
incrementSnippetViewsFunctionsUrlKey = "INCREMENT_SNIPPET_VIEWS_FUNCTIONS_URL"
defaultIncrementSnippetViewsFunctionsUrl = "http://increment_snippet_views:8080/"
)
// Environment operates with environment structures: NetworkEnvs, BeamEnvs, ApplicationEnvs
// Environment contains all environment variables which are used by the application
type Environment struct {
NetworkEnvs NetworkEnvs
BeamSdkEnvs BeamEnvs
ApplicationEnvs ApplicationEnvs
}
// NewEnvironment is a constructor for Environment.
// Default values:
// LogWriters: by default using os.Stdout
// NetworkEnvs: by default using defaultIp, defaultPort and defaultProtocol from constants
// BeamEnvs: by default using pb.Sdk_SDK_JAVA
// ApplicationEnvs: required field not providing by default value
func NewEnvironment(networkEnvs NetworkEnvs, beamEnvs BeamEnvs, appEnvs ApplicationEnvs) *Environment {
svc := Environment{}
svc.NetworkEnvs = networkEnvs
svc.BeamSdkEnvs = beamEnvs
svc.ApplicationEnvs = appEnvs
return &svc
}
// GetApplicationEnvsFromOsEnvs returns ApplicationEnvs.
// Lookups in os environment variables and tries to take values for all (exclude working dir) ApplicationEnvs parameters.
// In case some value doesn't exist sets default values:
// - pipeline execution timeout: 10 minutes
// - cache expiration time: 15 minutes
// - type of cache: local
// - cache address: localhost:6379
//
// If os environment variables don't contain a value for app working dir - returns error.
func GetApplicationEnvsFromOsEnvs() (*ApplicationEnvs, error) {
pipelineExecuteTimeout := getEnvAsDuration(pipelineExecuteTimeoutKey, defaultPipelineExecuteTimeout, "couldn't convert provided pipeline execute timeout. Using default %s\n")
cacheExpirationTime := getEnvAsDuration(cacheKeyExpirationTimeKey, defaultCacheKeyExpirationTime, "couldn't convert provided cache expiration time. Using default %s\n")
cacheType := getEnv(cacheTypeKey, defaultCacheType)
cacheAddress := getEnv(cacheAddressKey, defaultCacheAddress)
launchSite := getEnv(launchSiteKey, defaultLaunchSite)
projectId := os.Getenv(projectIdKey)
pipelinesFolder := getEnv(pipelinesFolderKey, defaultPipelinesFolder)
sdkConfigPath := getEnv(SDKConfigPathKey, defaultSDKConfigPath)
propertyPath := getEnv(propertyPathKey, defaultPropertyPath)
datasetsPath := getEnv(datasetsPathKey, defaultDatasetsPath)
kafkaEmulatorExecutablePath := getEnv(kafkaEmulatorExecutablePathKey, defaultKafkaEmulatorExecutablePath)
cacheRequestTimeout := getEnvAsDuration(cacheRequestTimeoutKey, defaultCacheRequestTimeout, "couldn't convert provided cache request timeout. Using default %s\n")
cleanupSnippetsFunctionsUrl := getEnv(cleanupSnippetsFunctionsUrlKey, defaultCleanupSnippetsFunctionsUrl)
putSnippetFunctionsUrl := getEnv(putSnippetFunctionsUrlKey, defaultPutSnippetFunctionsUrl)
incrementSnippetViewsFunctionsUrl := getEnv(incrementSnippetViewsFunctionsUrlKey, defaultIncrementSnippetViewsFunctionsUrl)
if value, present := os.LookupEnv(workingDirKey); present {
return NewApplicationEnvs(
value,
launchSite,
projectId,
pipelinesFolder,
sdkConfigPath,
propertyPath,
kafkaEmulatorExecutablePath,
datasetsPath,
cleanupSnippetsFunctionsUrl,
putSnippetFunctionsUrl,
incrementSnippetViewsFunctionsUrl,
NewCacheEnvs(
cacheType,
cacheAddress,
cacheExpirationTime,
),
pipelineExecuteTimeout,
cacheRequestTimeout,
), nil
}
return nil, errors.New("APP_WORK_DIR env should be provided with os.env")
}
// GetNetworkEnvsFromOsEnvs returns NetworkEnvs.
// Lookups in os environment variables and takes values for ip and port.
// In case some value doesn't exist sets default values:
// - ip: localhost
// - port: 8080
func GetNetworkEnvsFromOsEnvs() (*NetworkEnvs, error) {
ip := getEnv(serverIpKey, defaultIp)
port := defaultPort
protocol := getEnv(protocolTypeKey, defaultProtocol)
var err error
if value, present := os.LookupEnv(serverPortKey); present {
port, err = strconv.Atoi(value)
if err != nil {
return nil, err
}
}
return NewNetworkEnvs(ip, port, protocol), nil
}
// ConfigureBeamEnvs returns BeamEnvs.
// Lookups in os environment variables and takes value for Apache Beam SDK.
// If os environment variables don't contain a value for Apache Beam SDK - returns error.
// Configures ExecutorConfig with config file.
func ConfigureBeamEnvs(workDir string) (*BeamEnvs, error) {
sdk := pb.Sdk_SDK_UNSPECIFIED
preparedModDir, modDirExist := os.LookupEnv(preparedModDirKey)
numOfParallelJobs := getEnvAsInt(numOfParallelJobsKey, defaultNumOfParallelJobs)
beamVersion := getEnv(beamVersionKey, defaultBeamVersion)
if value, present := os.LookupEnv(beamSdkKey); present {
switch value {
case pb.Sdk_SDK_JAVA.String():
sdk = pb.Sdk_SDK_JAVA
case pb.Sdk_SDK_GO.String():
sdk = pb.Sdk_SDK_GO
if !modDirExist {
return nil, errors.New("env PREPARED_MOD_DIR must be specified in the environment variables for GO sdk")
}
case pb.Sdk_SDK_PYTHON.String():
sdk = pb.Sdk_SDK_PYTHON
case pb.Sdk_SDK_SCIO.String():
sdk = pb.Sdk_SDK_SCIO
}
}
if sdk == pb.Sdk_SDK_UNSPECIFIED {
return NewBeamEnvs(sdk, beamVersion, nil, preparedModDir, numOfParallelJobs), nil
}
configPath := filepath.Join(workDir, configFolderName, sdk.String()+jsonExt)
executorConfig, err := createExecutorConfig(sdk, configPath)
if err != nil {
return nil, err
}
return NewBeamEnvs(sdk, beamVersion, executorConfig, preparedModDir, numOfParallelJobs), nil
}
// createExecutorConfig creates ExecutorConfig that corresponds to specific Apache Beam SDK.
// Configures ExecutorConfig with config file which is located at configPath.
func createExecutorConfig(apacheBeamSdk pb.Sdk, configPath string) (*ExecutorConfig, error) {
executorConfig, err := getConfigFromJson(configPath)
if err != nil {
return nil, err
}
switch apacheBeamSdk {
case pb.Sdk_SDK_JAVA:
args, err := ConcatBeamJarsToString()
if err != nil {
return nil, fmt.Errorf("error during processing jars: %s", err.Error())
}
executorConfig.CompileArgs = append(executorConfig.CompileArgs, args)
executorConfig.RunArgs[1] = fmt.Sprintf("%s%s", executorConfig.RunArgs[1], args)
executorConfig.TestArgs[1] = fmt.Sprintf("%s%s", executorConfig.TestArgs[1], args)
case pb.Sdk_SDK_GO:
// Go sdk doesn't need any additional arguments from the config file
case pb.Sdk_SDK_PYTHON:
// Python sdk doesn't need any additional arguments from the config file
case pb.Sdk_SDK_SCIO:
// Scala sdk doesn't need any additional arguments from the config file
}
return executorConfig, nil
}
func ConcatBeamJarsToString() (string, error) {
jars, err := filepath.Glob(getEnv(beamPathKey, defaultBeamJarsPath))
if err != nil {
return "", err
}
args := strings.Join(jars, ":")
return args, nil
}
// getConfigFromJson reads a json file to ExecutorConfig
func getConfigFromJson(configPath string) (*ExecutorConfig, error) {
file, err := ioutil.ReadFile(configPath)
if err != nil {
return nil, err
}
executorConfig := ExecutorConfig{}
err = json.Unmarshal(file, &executorConfig)
if err != nil {
return nil, err
}
return &executorConfig, err
}
// getEnv returns an environment variable or default value
func getEnv(key, defaultValue string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return defaultValue
}
// getEnvAsInt returns an environment variable or default value as integer
func getEnvAsInt(key string, defaultValue int) int {
if value, present := os.LookupEnv(key); present {
convertedValue, err := strconv.Atoi(value)
if err != nil {
logger.Errorf("Incorrect value for %s. Should be integer. Will be used default value: %d", key, defaultValue)
return defaultValue
} else {
if convertedValue <= 0 {
logger.Errorf("Incorrect value for %s. Should be a positive integer value but it is %d. Will be used default value: %d", key, convertedValue, defaultValue)
return defaultValue
} else {
return convertedValue
}
}
}
return defaultValue
}
// getEnvAsDuration returns an environment variable or default value as duration
func getEnvAsDuration(key string, defaultValue time.Duration, errMsg string) time.Duration {
if value, present := os.LookupEnv(key); present {
if converted, err := time.ParseDuration(value); err == nil {
return converted
} else {
log.Printf(errMsg, defaultValue)
}
}
return defaultValue
}