blob: f788f2e613f4f77e37aa6a514d26510dcc87ce42 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"cloud.google.com/go/datastore"
"context"
"errors"
"github.com/google/uuid"
"strconv"
pb "beam.apache.org/playground/backend/internal/api/v1"
"beam.apache.org/playground/backend/internal/cache"
"beam.apache.org/playground/backend/internal/code_processing"
"beam.apache.org/playground/backend/internal/components"
"beam.apache.org/playground/backend/internal/db"
"beam.apache.org/playground/backend/internal/db/entity"
"beam.apache.org/playground/backend/internal/db/mapper"
"beam.apache.org/playground/backend/internal/emulators"
"beam.apache.org/playground/backend/internal/environment"
cerrors "beam.apache.org/playground/backend/internal/errors"
"beam.apache.org/playground/backend/internal/logger"
"beam.apache.org/playground/backend/internal/setup_tools/life_cycle"
"beam.apache.org/playground/backend/internal/utils"
)
const (
errorTitleGetSnippet = "Error during getting snippet"
errorTitleSaveSnippet = "Error during saving snippet"
errorTitleGetCatalog = "Error during getting catalog"
errorTitleGetExample = "Error during getting example"
errorTitleGetExampleCode = "Error during getting example code"
errorTitleGetExampleOutput = "Error during getting example output"
errorTitleGetExampleLogs = "Error during getting example logs"
errorTitleGetExampleGraph = "Error during getting example graph"
errorTitleGetDefaultExample = "Error during getting default example"
errorTitleRunCode = "Error during run code"
userCloudConnectionErrMsg = "Cloud connection error"
resourceNotFoundErrMsg = "Resource is not found"
resourceInconsistentErrMsg = "Resource is not consistent"
)
// playgroundController processes `gRPC' requests from clients.
// Contains methods to process receiving code, monitor current status of code processing and receive compile/run output.
type playgroundController struct {
env *environment.Environment
cacheService cache.Cache
// Database setup only if the server doesn't suppose to run code, i.e. SDK is unspecified
db db.Database
props *environment.Properties
entityMapper mapper.EntityMapper
cacheComponent *components.CacheComponent
pb.UnimplementedPlaygroundServiceServer
}
// RunCode is running code from requests using a particular SDK
// - In case of incorrect sdk returns codes.InvalidArgument
// - In case of exceeded number of parallel jobs returns codes.ResourceExhausted
// - In case of error during preparing files/folders returns codes.Internal
// - In case of no errors saves playground.Status_STATUS_EXECUTING as cache.Status into cache and sets expiration time
// for all cache values which will be saved into cache during processing received code.
// Returns id of code processing (pipelineId)
func (controller *playgroundController) RunCode(ctx context.Context, info *pb.RunCodeRequest) (*pb.RunCodeResponse, error) {
// check if we can take a new RunCode request
if !utils.CheckNumOfTheParallelJobs(controller.env.ApplicationEnvs.WorkingDir(), controller.env.BeamSdkEnvs.NumOfParallelJobs()) {
logger.Warnf("RunCode(): number of parallel jobs is exceeded\n")
return nil, cerrors.ResourceExhaustedError("Error during preparing", "Number of parallel jobs is exceeded")
}
// check for correct sdk
if info.Sdk != controller.env.BeamSdkEnvs.ApacheBeamSdk {
logger.Errorf("RunCode(): request contains incorrect sdk: %s\n", info.Sdk)
return nil, cerrors.InvalidArgumentError("Error during preparing", "Incorrect sdk. Want to receive %s, but the request contains %s", controller.env.BeamSdkEnvs.ApacheBeamSdk.String(), info.Sdk.String())
}
switch info.Sdk {
case pb.Sdk_SDK_UNSPECIFIED:
logger.Errorf("RunCode(): unimplemented sdk: %s\n", info.Sdk)
return nil, cerrors.InvalidArgumentError("Error during preparing", "Sdk is not implemented yet: %s", info.Sdk.String())
}
cacheExpirationTime := controller.env.ApplicationEnvs.CacheEnvs().KeyExpirationTime()
pipelineId := uuid.New()
sources := make([]entity.FileEntity, 0)
if len(info.Files) > 0 {
for _, file := range info.Files {
sources = append(sources, entity.FileEntity{
Name: file.Name,
Content: file.Content,
IsMain: file.IsMain,
CntxLine: 1,
})
}
} else {
fileName, err := utils.GetFileName("", info.Code, info.Sdk)
if err != nil {
return nil, cerrors.InternalError(errorTitleRunCode, "Failed to get default filename")
}
sources = append(sources, entity.FileEntity{
Name: fileName,
Content: info.Code,
IsMain: true,
CntxLine: 1,
})
}
emulatorConfiguration := emulators.EmulatorConfiguration{
Datasets: info.Datasets,
DatasetsPath: controller.env.ApplicationEnvs.DatasetsPath(),
KafkaEmulatorExecutablePath: controller.env.ApplicationEnvs.KafkaExecutablePath(),
}
lc, err := life_cycle.Setup(info.Sdk, sources, pipelineId, controller.env.ApplicationEnvs.WorkingDir(), controller.env.ApplicationEnvs.PipelinesFolder(), controller.env.BeamSdkEnvs.PreparedModDir(), emulatorConfiguration)
if err != nil {
logger.Errorf("RunCode(): error during setup file system: %s\n", err.Error())
return nil, cerrors.InternalError("Error during preparing", "Error during setup file system for the code processing: %s", err.Error())
}
if err = utils.SetToCache(controller.cacheService, pipelineId, cache.Status, pb.Status_STATUS_VALIDATING); err != nil {
code_processing.DeleteResources(pipelineId, lc)
return nil, cerrors.InternalError("Error during preparing", "Error during saving status of the code processing")
}
if err = utils.SetToCache(controller.cacheService, pipelineId, cache.RunOutputIndex, 0); err != nil {
code_processing.DeleteResources(pipelineId, lc)
return nil, cerrors.InternalError("Error during preparing", "Error during saving initial run output")
}
if err = utils.SetToCache(controller.cacheService, pipelineId, cache.LogsIndex, 0); err != nil {
code_processing.DeleteResources(pipelineId, lc)
return nil, cerrors.InternalError("Error during preparing", "Error during saving value for the logs output")
}
if err = utils.SetToCache(controller.cacheService, pipelineId, cache.Canceled, false); err != nil {
code_processing.DeleteResources(pipelineId, lc)
return nil, cerrors.InternalError("Error during preparing", "Error during saving initial cancel flag")
}
if err = controller.cacheService.SetExpTime(ctx, pipelineId, cacheExpirationTime); err != nil {
logger.Errorf("%s: RunCode(): cache.SetExpTime(): %s\n", pipelineId, err.Error())
code_processing.DeleteResources(pipelineId, lc)
return nil, cerrors.InternalError("Error during preparing", "Internal error")
}
go code_processing.Process(context.Background(), controller.cacheService, lc, pipelineId, &controller.env.ApplicationEnvs, &controller.env.BeamSdkEnvs, info.PipelineOptions)
pipelineInfo := pb.RunCodeResponse{PipelineUuid: pipelineId.String()}
return &pipelineInfo, nil
}
// CheckStatus is checking status for the specific pipeline by PipelineUuid
func (controller *playgroundController) CheckStatus(ctx context.Context, info *pb.CheckStatusRequest) (*pb.CheckStatusResponse, error) {
pipelineId, err := uuid.Parse(info.PipelineUuid)
errorMessage := "Error during getting status of the code processing"
if err != nil {
logger.Errorf("%s: CheckStatus(): pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid, err.Error())
return nil, cerrors.InvalidArgumentError(errorMessage, "pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid)
}
status, err := code_processing.GetProcessingStatus(ctx, controller.cacheService, pipelineId, errorMessage)
if err != nil {
return nil, err
}
return &pb.CheckStatusResponse{Status: status}, nil
}
// GetRunOutput is returning output of execution for specific pipeline by PipelineUuid
func (controller *playgroundController) GetRunOutput(ctx context.Context, info *pb.GetRunOutputRequest) (*pb.GetRunOutputResponse, error) {
pipelineId, err := uuid.Parse(info.PipelineUuid)
errorMessage := "Error during getting run output of the code processing"
if err != nil {
logger.Errorf("%s: GetRunOutput(): pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid, err.Error())
return nil, cerrors.InvalidArgumentError(errorMessage, "pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid)
}
lastIndex, err := code_processing.GetLastIndex(ctx, controller.cacheService, pipelineId, cache.RunOutputIndex, errorMessage)
if err != nil {
return nil, err
}
runOutput, err := code_processing.GetProcessingOutput(ctx, controller.cacheService, pipelineId, cache.RunOutput, errorMessage)
if err != nil {
return nil, err
}
newRunOutput := ""
if len(runOutput) > lastIndex {
newRunOutput = runOutput[lastIndex:]
if err := utils.SetToCache(controller.cacheService, pipelineId, cache.RunOutputIndex, lastIndex+len(newRunOutput)); err != nil {
return nil, cerrors.InternalError(errorMessage, "Error during saving pagination value")
}
}
pipelineResult := pb.GetRunOutputResponse{Output: newRunOutput}
return &pipelineResult, nil
}
// GetLogs is returning logs of execution for specific pipeline by PipelineUuid
func (controller *playgroundController) GetLogs(ctx context.Context, info *pb.GetLogsRequest) (*pb.GetLogsResponse, error) {
errorTitle := utils.GetFuncName(controller.GetRunOutput)
pipelineId, err := uuid.Parse(info.PipelineUuid)
errorMessage := "Error during getting logs of the code processing"
if err != nil {
logger.Errorf("%s: %s: pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid, errorTitle, err.Error())
return nil, cerrors.InvalidArgumentError(errorMessage, "pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid)
}
lastIndex, err := code_processing.GetLastIndex(ctx, controller.cacheService, pipelineId, cache.LogsIndex, errorMessage)
if err != nil {
return nil, err
}
logs, err := code_processing.GetProcessingOutput(ctx, controller.cacheService, pipelineId, cache.Logs, errorMessage)
if err != nil {
return nil, err
}
newLogs := ""
if len(logs) > lastIndex {
newLogs = logs[lastIndex:]
if err := utils.SetToCache(controller.cacheService, pipelineId, cache.LogsIndex, lastIndex+len(newLogs)); err != nil {
return nil, cerrors.InternalError(errorMessage, "Error during saving pagination value")
}
}
pipelineResult := pb.GetLogsResponse{Output: newLogs}
return &pipelineResult, nil
}
// GetRunError is returning error output of execution for specific pipeline by PipelineUuid
func (controller *playgroundController) GetRunError(ctx context.Context, info *pb.GetRunErrorRequest) (*pb.GetRunErrorResponse, error) {
pipelineId, err := uuid.Parse(info.PipelineUuid)
errorMessage := "Error during getting error output of the code processing"
if err != nil {
logger.Errorf("%s: GetRunError(): pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid, err.Error())
return nil, cerrors.InvalidArgumentError(errorMessage, "pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid)
}
runError, err := code_processing.GetProcessingOutput(ctx, controller.cacheService, pipelineId, cache.RunError, errorMessage)
if err != nil {
return nil, err
}
return &pb.GetRunErrorResponse{Output: runError}, nil
}
// GetValidationOutput is returning output of validation for specific pipeline by PipelineUuid
func (controller *playgroundController) GetValidationOutput(ctx context.Context, info *pb.GetValidationOutputRequest) (*pb.GetValidationOutputResponse, error) {
pipelineId, err := uuid.Parse(info.PipelineUuid)
errorMessage := "Error during getting compilation output"
if err != nil {
logger.Errorf("%s: GetValidationOutput(): pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid, err.Error())
return nil, cerrors.InvalidArgumentError(errorMessage, "pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid)
}
validationOutput, err := code_processing.GetProcessingOutput(ctx, controller.cacheService, pipelineId, cache.ValidationOutput, errorMessage)
if err != nil {
return nil, err
}
return &pb.GetValidationOutputResponse{Output: validationOutput}, nil
}
// GetPreparationOutput is returning output of prepare step for specific pipeline by PipelineUuid
func (controller *playgroundController) GetPreparationOutput(ctx context.Context, info *pb.GetPreparationOutputRequest) (*pb.GetPreparationOutputResponse, error) {
pipelineId, err := uuid.Parse(info.PipelineUuid)
errorMessage := "Error during getting compilation output"
if err != nil {
logger.Errorf("%s: GetPreparationOutput(): pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid, err.Error())
return nil, cerrors.InvalidArgumentError(errorMessage, "pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid)
}
preparationOutput, err := code_processing.GetProcessingOutput(ctx, controller.cacheService, pipelineId, cache.PreparationOutput, errorMessage)
if err != nil {
return nil, err
}
return &pb.GetPreparationOutputResponse{Output: preparationOutput}, nil
}
// GetCompileOutput is returning output of compilation for specific pipeline by PipelineUuid
func (controller *playgroundController) GetCompileOutput(ctx context.Context, info *pb.GetCompileOutputRequest) (*pb.GetCompileOutputResponse, error) {
pipelineId, err := uuid.Parse(info.PipelineUuid)
errorMessage := "Error during getting compilation output"
if err != nil {
logger.Errorf("%s: GetCompileOutput(): pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid, err.Error())
return nil, cerrors.InvalidArgumentError(errorMessage, "pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid)
}
compileOutput, err := code_processing.GetProcessingOutput(ctx, controller.cacheService, pipelineId, cache.CompileOutput, errorMessage)
if err != nil {
return nil, err
}
return &pb.GetCompileOutputResponse{Output: compileOutput}, nil
}
// GetGraph is returning graph of execution for specific pipeline by PipelineUuid
func (controller *playgroundController) GetGraph(ctx context.Context, info *pb.GetGraphRequest) (*pb.GetGraphResponse, error) {
pipelineId, err := uuid.Parse(info.PipelineUuid)
errorMessage := "Error during getting graph output"
if err != nil {
logger.Errorf("%s: GetGraph(): pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid, err.Error())
return nil, cerrors.InvalidArgumentError(errorMessage, "pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid)
}
graph, err := code_processing.GetGraph(ctx, controller.cacheService, pipelineId, errorMessage)
if err != nil {
return nil, err
}
return &pb.GetGraphResponse{Graph: graph}, nil
}
// Cancel is setting cancel flag to stop code processing
func (controller *playgroundController) Cancel(ctx context.Context, info *pb.CancelRequest) (*pb.CancelResponse, error) {
pipelineId, err := uuid.Parse(info.PipelineUuid)
errorMessage := "Error during canceling the code processing"
if err != nil {
logger.Errorf("%s: Cancel(): pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid, err.Error())
return nil, cerrors.InvalidArgumentError(errorMessage, "pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid)
}
if err := utils.SetToCache(controller.cacheService, pipelineId, cache.Canceled, true); err != nil {
return nil, cerrors.InternalError(errorMessage, "Error during saving cancel flag value")
}
return &pb.CancelResponse{}, nil
}
// GetPrecompiledObjects returns the catalog with examples
// Tries to get the whole catalog from the cache
// - If there is no catalog in the cache, gets the catalog from the Datastore and saves it to the cache
// - If SDK or category is specified in the request, gets the catalog from the cache and filters it by SDK and category
func (controller *playgroundController) GetPrecompiledObjects(ctx context.Context, info *pb.GetPrecompiledObjectsRequest) (*pb.GetPrecompiledObjectsResponse, error) {
if err := controller.verifyRouter(); err != nil {
logger.Errorf("GetPrecompiledObjects() error: %s", err.Error())
return nil, err
}
catalog, err := controller.cacheComponent.GetCatalogFromCacheOrDatastore(ctx, controller.env.ApplicationEnvs.CacheRequestTimeout())
if err != nil {
return nil, cerrors.InternalError(errorTitleGetCatalog, userCloudConnectionErrMsg)
}
return &pb.GetPrecompiledObjectsResponse{
SdkCategories: utils.FilterCatalog(catalog, info.Sdk, info.Category),
}, nil
}
// GetPrecompiledObject returns precompiled object from the Datastore or the cache
func (controller *playgroundController) GetPrecompiledObject(ctx context.Context, info *pb.GetPrecompiledObjectRequest) (*pb.GetPrecompiledObjectResponse, error) {
if err := controller.verifyRouter(); err != nil {
logger.Errorf("GetPrecompiledObject() error: %s", err.Error())
return nil, err
}
exampleId := info.GetCloudPath()
sdks, err := controller.cacheComponent.GetSdkCatalogFromCacheOrDatastore(ctx, controller.env.ApplicationEnvs.CacheRequestTimeout())
if err != nil {
return nil, cerrors.InternalError(errorTitleGetExample, userCloudConnectionErrMsg)
}
precompiledObject, err := controller.db.GetExample(ctx, exampleId, sdks)
if err != nil {
switch err {
case datastore.ErrNoSuchEntity:
return nil, cerrors.NotFoundError(errorTitleGetExample, resourceNotFoundErrMsg)
default:
return nil, cerrors.InternalError(errorTitleGetExample, userCloudConnectionErrMsg)
}
}
return &pb.GetPrecompiledObjectResponse{PrecompiledObject: precompiledObject}, nil
}
// GetPrecompiledObjectCode returns the code of the specific example
func (controller *playgroundController) GetPrecompiledObjectCode(ctx context.Context, info *pb.GetPrecompiledObjectCodeRequest) (*pb.GetPrecompiledObjectCodeResponse, error) {
if err := controller.verifyRouter(); err != nil {
logger.Errorf("GetPrecompiledObjectCode() error: %s", err.Error())
return nil, err
}
exampleId := info.GetCloudPath()
files, err := controller.db.GetExampleCode(ctx, exampleId)
if err != nil {
switch err {
case datastore.ErrNoSuchEntity:
return nil, cerrors.NotFoundError(errorTitleGetExampleCode, resourceNotFoundErrMsg)
default:
return nil, cerrors.InternalError(errorTitleGetExampleCode, userCloudConnectionErrMsg)
}
}
if len(files) == 0 {
return nil, cerrors.NotFoundError(errorTitleGetExampleCode, resourceNotFoundErrMsg)
}
response := pb.GetPrecompiledObjectCodeResponse{}
for _, file := range files {
response.Files = append(response.Files, &pb.SnippetFile{
Name: file.Name,
Content: file.Content,
IsMain: file.IsMain,
})
if file.IsMain {
response.Code = file.Content
}
}
if len(response.Files) == 0 || response.Code == "" {
return nil, cerrors.InternalError(errorTitleGetExampleCode, resourceInconsistentErrMsg)
}
return &response, nil
}
// GetPrecompiledObjectOutput returns the output of the compiled and run example
func (controller *playgroundController) GetPrecompiledObjectOutput(ctx context.Context, info *pb.GetPrecompiledObjectOutputRequest) (*pb.GetPrecompiledObjectOutputResponse, error) {
if err := controller.verifyRouter(); err != nil {
logger.Errorf("GetPrecompiledObjectOutput() error: %s", err.Error())
return nil, err
}
exampleId := info.GetCloudPath()
output, err := controller.db.GetExampleOutput(ctx, exampleId)
if err != nil {
switch err {
case datastore.ErrNoSuchEntity:
return nil, cerrors.NotFoundError(errorTitleGetExampleOutput, resourceNotFoundErrMsg)
default:
return nil, cerrors.InternalError(errorTitleGetExampleOutput, userCloudConnectionErrMsg)
}
}
response := pb.GetPrecompiledObjectOutputResponse{Output: output}
return &response, nil
}
// GetPrecompiledObjectLogs returns the logs of the compiled and run example
func (controller *playgroundController) GetPrecompiledObjectLogs(ctx context.Context, info *pb.GetPrecompiledObjectLogsRequest) (*pb.GetPrecompiledObjectLogsResponse, error) {
if err := controller.verifyRouter(); err != nil {
logger.Errorf("GetPrecompiledObjectLogs() error: %s", err.Error())
return nil, err
}
exampleId := info.GetCloudPath()
logs, err := controller.db.GetExampleLogs(ctx, exampleId)
if err != nil {
switch err {
case datastore.ErrNoSuchEntity:
return nil, cerrors.NotFoundError(errorTitleGetExampleLogs, resourceNotFoundErrMsg)
default:
return nil, cerrors.InternalError(errorTitleGetExampleLogs, userCloudConnectionErrMsg)
}
}
response := pb.GetPrecompiledObjectLogsResponse{Output: logs}
return &response, nil
}
// GetPrecompiledObjectGraph returns the graph of the compiled and run example
func (controller *playgroundController) GetPrecompiledObjectGraph(ctx context.Context, info *pb.GetPrecompiledObjectGraphRequest) (*pb.GetPrecompiledObjectGraphResponse, error) {
if err := controller.verifyRouter(); err != nil {
logger.Errorf("GetPrecompiledObjectGraph() error: %s", err.Error())
return nil, err
}
exampleId := info.GetCloudPath()
graph, err := controller.db.GetExampleGraph(ctx, exampleId)
if err != nil {
switch err {
case datastore.ErrNoSuchEntity:
return nil, cerrors.NotFoundError(errorTitleGetExampleGraph, resourceNotFoundErrMsg)
default:
return nil, cerrors.InternalError(errorTitleGetExampleGraph, userCloudConnectionErrMsg)
}
}
response := pb.GetPrecompiledObjectGraphResponse{Graph: graph}
return &response, nil
}
// GetDefaultPrecompiledObject returns the default precompile object for sdk.
func (controller *playgroundController) GetDefaultPrecompiledObject(ctx context.Context, info *pb.GetDefaultPrecompiledObjectRequest) (*pb.GetDefaultPrecompiledObjectResponse, error) {
if err := controller.verifyRouter(); err != nil {
logger.Errorf("GetDefaultPrecompiledObject() error: %s", err.Error())
return nil, err
}
switch info.Sdk {
case pb.Sdk_SDK_UNSPECIFIED:
logger.Errorf("GetDefaultPrecompiledObject(): unimplemented sdk: %s\n", info.Sdk)
return nil, cerrors.InvalidArgumentError(errorTitleGetDefaultExample, "Sdk is not implemented yet: %s", info.Sdk.String())
}
precompiledObject, err := controller.cacheComponent.GetDefaultPrecompiledObjectFromCacheOrDatastore(ctx, info.Sdk, controller.env.ApplicationEnvs.CacheRequestTimeout())
if err != nil {
logger.Errorf("GetDefaultPrecompiledObject(): error during getting catalog: %s", err.Error())
return nil, cerrors.InternalError(errorTitleGetDefaultExample, userCloudConnectionErrMsg)
}
response := pb.GetDefaultPrecompiledObjectResponse{PrecompiledObject: precompiledObject}
return &response, nil
}
// SaveSnippet returns the generated ID
func (controller *playgroundController) SaveSnippet(ctx context.Context, req *pb.SaveSnippetRequest) (*pb.SaveSnippetResponse, error) {
if err := controller.verifyRouter(); err != nil {
logger.Errorf("SaveSnippet() error: %s", err.Error())
return nil, err
}
if req.Sdk == pb.Sdk_SDK_UNSPECIFIED {
logger.Errorf("SaveSnippet(): unimplemented sdk: %s\n", req.Sdk)
return nil, cerrors.InvalidArgumentError(errorTitleSaveSnippet, "Sdk is not implemented yet: %s", req.Sdk.String())
}
if controller.db == nil {
logger.Error("SaveSnippet(): the runner is trying to save the snippet")
return nil, cerrors.InvalidArgumentError(errorTitleSaveSnippet, "The runner doesn't support snippets")
}
if req.Files == nil || len(req.Files) == 0 {
logger.Error("SaveSnippet(): files are empty")
return nil, cerrors.InvalidArgumentError(errorTitleSaveSnippet, "Snippet must have files")
}
if req.PersistenceKey > "" {
logger.Debugf("saving snippet by persistence_key: %v", req.PersistenceKey)
}
snippet := controller.entityMapper.ToSnippet(req)
for _, file := range req.Files {
if file.Content == "" {
logger.Error("SaveSnippet(): entity is empty")
return nil, cerrors.InvalidArgumentError(errorTitleSaveSnippet, "Snippet must have some content")
}
maxSnippetSize := controller.props.MaxSnippetSize
if len(file.Content) > int(maxSnippetSize) {
logger.Errorf("SaveSnippet(): entity is too large. Max entity size: %d symbols", maxSnippetSize)
return nil, cerrors.InvalidArgumentError(errorTitleSaveSnippet, "Snippet size is more than %d symbols", maxSnippetSize)
}
fileEntity, err := controller.entityMapper.ToFileEntity(req, file)
if err != nil {
logger.Errorf("SaveSnippet(): file has wrong properties, err: %s", err.Error())
return nil, cerrors.InvalidArgumentError(errorTitleSaveSnippet, "File content is invalid")
}
snippet.Files = append(snippet.Files, fileEntity)
}
id, err := snippet.ID()
if err != nil {
logger.Errorf("SaveSnippet(): ID(): error during ID generation: %s", err.Error())
return nil, cerrors.InternalError(errorTitleSaveSnippet, "Failed to generate ID")
}
if err = controller.db.PutSnippet(ctx, id, snippet); err != nil {
logger.Errorf("SaveSnippet(): PutSnippet(): error during entity saving: %s", err.Error())
return nil, cerrors.InternalError(errorTitleSaveSnippet, "Failed to save a snippet entity")
}
response := pb.SaveSnippetResponse{Id: id}
return &response, nil
}
// GetSnippet returns the snippet entity
func (controller *playgroundController) GetSnippet(ctx context.Context, info *pb.GetSnippetRequest) (*pb.GetSnippetResponse, error) {
if err := controller.verifyRouter(); err != nil {
logger.Errorf("GetSnippet() error: %s", err.Error())
return nil, err
}
if controller.db == nil {
logger.Error("GetSnippet(): the runner is trying to read the snippet")
return nil, cerrors.InvalidArgumentError(errorTitleGetSnippet, "The runner doesn't support snippets")
}
snippet, err := controller.db.GetSnippet(ctx, info.GetId())
if err != nil {
logger.Errorf("GetSnippet(): error during getting the snippet: %s", err.Error())
if errors.Is(err, datastore.ErrNoSuchEntity) {
return nil, cerrors.NotFoundError(errorTitleGetSnippet, resourceNotFoundErrMsg)
}
return nil, cerrors.InternalError(errorTitleGetSnippet, "Failed to retrieve the snippet")
}
response := pb.GetSnippetResponse{
Sdk: pb.Sdk(pb.Sdk_value[snippet.Sdk.Name]),
PipelineOptions: snippet.PipeOpts,
Complexity: pb.Complexity(pb.Complexity_value[snippet.Complexity]),
}
files, err := controller.db.GetFiles(ctx, info.GetId(), snippet.NumberOfFiles)
if err != nil {
logger.Errorf("GetSnippet(): GetFiles(): error during getting files: %s", err.Error())
return nil, cerrors.InternalError(errorTitleGetSnippet, "Failed to retrieve files")
}
for _, file := range files {
response.Files = append(response.Files, &pb.SnippetFile{
Name: file.Name,
Content: file.Content,
IsMain: file.IsMain,
})
}
return &response, nil
}
// GetMetadata returns runner metadata
func (controller *playgroundController) GetMetadata(_ context.Context, _ *pb.GetMetadataRequest) (*pb.GetMetadataResponse, error) {
commitTimestampInteger, err := strconv.ParseInt(BuildCommitTimestamp, 10, 64)
if err != nil {
logger.Warnf("GetMetadata(): failed to parse BuildCommitTimestamp (\"%s\"): %s", BuildCommitTimestamp, err.Error())
commitTimestampInteger = 0
}
response := pb.GetMetadataResponse{
RunnerSdk: controller.env.BeamSdkEnvs.ApacheBeamSdk.String(),
BuildCommitHash: BuildCommitHash,
BuildCommitTimestampSecondsSinceEpoch: commitTimestampInteger,
BeamSdkVersion: controller.env.BeamSdkEnvs.BeamVersion,
}
return &response, nil
}
// verifyRouter verifies that controller is configured to work in router mode
func (controller *playgroundController) verifyRouter() error {
if controller.env.BeamSdkEnvs.ApacheBeamSdk != pb.Sdk_SDK_UNSPECIFIED {
return errors.New("server is in runner mode")
}
if controller.db == nil {
return errors.New("no database service")
}
if controller.props == nil {
return errors.New("no properties")
}
if controller.entityMapper == nil {
return errors.New("no entity mapper")
}
if controller.cacheComponent == nil {
return errors.New("no cache component")
}
return nil
}