| /* |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package services |
| |
| import ( |
| "context" |
| "fmt" |
| "os" |
| "path/filepath" |
| "strings" |
| "sync" |
| "time" |
| |
| "github.com/apache/incubator-devlake/core/dal" |
| "github.com/apache/incubator-devlake/core/errors" |
| "github.com/apache/incubator-devlake/core/models" |
| "github.com/apache/incubator-devlake/core/plugin" |
| "github.com/apache/incubator-devlake/core/utils" |
| "github.com/apache/incubator-devlake/impls/logruslog" |
| "github.com/google/uuid" |
| v11 "go.temporal.io/api/enums/v1" |
| "go.temporal.io/sdk/client" |
| "go.temporal.io/sdk/converter" |
| "golang.org/x/sync/semaphore" |
| ) |
| |
| var notificationService *NotificationService |
| var temporalClient client.Client |
| var globalPipelineLog = logruslog.Global.Nested("pipeline service") |
| |
| // PipelineQuery is a query for GetPipelines |
| type PipelineQuery struct { |
| Pagination |
| Status string `form:"status"` |
| Pending int `form:"pending"` |
| BlueprintId uint64 `uri:"blueprintId" form:"blueprint_id"` |
| Label string `form:"label"` |
| } |
| |
| func pipelineServiceInit() { |
| // notification |
| var notificationEndpoint = cfg.GetString("NOTIFICATION_ENDPOINT") |
| var notificationSecret = cfg.GetString("NOTIFICATION_SECRET") |
| if strings.TrimSpace(notificationEndpoint) != "" { |
| notificationService = NewNotificationService(notificationEndpoint, notificationSecret) |
| } |
| |
| // temporal client |
| var temporalUrl = cfg.GetString("TEMPORAL_URL") |
| if temporalUrl != "" { |
| // TODO: logger |
| var err error |
| temporalClient, err = client.NewClient(client.Options{ |
| HostPort: temporalUrl, |
| }) |
| if err != nil { |
| panic(err) |
| } |
| watchTemporalPipelines() |
| } else { |
| // standalone mode: reset pipeline status |
| errMsg := "The process was terminated unexpectedly" |
| err := db.UpdateColumns( |
| &models.Pipeline{}, |
| []dal.DalSet{ |
| {ColumnName: "status", Value: models.TASK_FAILED}, |
| {ColumnName: "message", Value: errMsg}, |
| }, |
| dal.Where("status = ?", models.TASK_RUNNING), |
| ) |
| if err != nil { |
| panic(err) |
| } |
| err = db.UpdateColumns( |
| &models.Task{}, |
| []dal.DalSet{ |
| {ColumnName: "status", Value: models.TASK_FAILED}, |
| {ColumnName: "message", Value: errMsg}, |
| }, |
| dal.Where("status = ?", models.TASK_RUNNING), |
| ) |
| if err != nil { |
| panic(err) |
| } |
| } |
| |
| err := ReloadBlueprints(cronManager) |
| if err != nil { |
| panic(err) |
| } |
| |
| var pipelineMaxParallel = cfg.GetInt64("PIPELINE_MAX_PARALLEL") |
| if pipelineMaxParallel < 0 { |
| panic(errors.BadInput.New(`PIPELINE_MAX_PARALLEL should be a positive integer`)) |
| } |
| if pipelineMaxParallel == 0 { |
| globalPipelineLog.Warn(nil, `pipelineMaxParallel=0 means pipeline will be run No Limit`) |
| pipelineMaxParallel = 10000 |
| } |
| // run pipeline with independent goroutine |
| go RunPipelineInQueue(pipelineMaxParallel) |
| } |
| |
| // CreatePipeline and return the model |
| func CreatePipeline(newPipeline *models.NewPipeline) (*models.Pipeline, errors.Error) { |
| pipeline, err := CreateDbPipeline(newPipeline) |
| if err != nil { |
| return nil, errors.Convert(err) |
| } |
| return pipeline, nil |
| } |
| |
| // GetPipelines by query |
| func GetPipelines(query *PipelineQuery) ([]*models.Pipeline, int64, errors.Error) { |
| pipelines, i, err := GetDbPipelines(query) |
| if err != nil { |
| return nil, 0, errors.Convert(err) |
| } |
| for _, p := range pipelines { |
| err = fillPipelineDetail(p) |
| if err != nil { |
| return nil, 0, err |
| } |
| } |
| return pipelines, i, nil |
| } |
| |
| // GetPipeline by id |
| func GetPipeline(pipelineId uint64) (*models.Pipeline, errors.Error) { |
| dbPipeline, err := GetDbPipeline(pipelineId) |
| if err != nil { |
| return nil, err |
| } |
| err = fillPipelineDetail(dbPipeline) |
| if err != nil { |
| return nil, err |
| } |
| return dbPipeline, nil |
| } |
| |
| // GetPipelineLogsArchivePath creates an archive for the logs of this pipeline and returns its file path |
| func GetPipelineLogsArchivePath(pipeline *models.Pipeline) (string, errors.Error) { |
| logPath, err := getPipelineLogsPath(pipeline) |
| if err != nil { |
| return "", err |
| } |
| archive := fmt.Sprintf("%s/%s/logging.tar.gz", os.TempDir(), uuid.New()) |
| if err = utils.CreateGZipArchive(archive, fmt.Sprintf("%s/*", logPath)); err != nil { |
| return "", err |
| } |
| return archive, err |
| } |
| |
| // RunPipelineInQueue query pipeline from db and run it in a queue |
| func RunPipelineInQueue(pipelineMaxParallel int64) { |
| sema := semaphore.NewWeighted(pipelineMaxParallel) |
| runningParallelLabels := []string{} |
| var runningParallelLabelLock sync.Mutex |
| for { |
| globalPipelineLog.Info("acquire lock") |
| // start goroutine when sema lock ready and pipeline exist. |
| // to avoid read old pipeline, acquire lock before read exist pipeline |
| err := sema.Acquire(context.TODO(), 1) |
| if err != nil { |
| panic(err) |
| } |
| globalPipelineLog.Info("get lock and wait next pipeline") |
| dbPipeline := &models.Pipeline{} |
| for { |
| cronLocker.Lock() |
| // prepare query to find an appropriate pipeline to execute |
| err := db.First(dbPipeline, |
| dal.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN}), |
| dal.Join( |
| `left join _devlake_pipeline_labels ON |
| _devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id AND |
| _devlake_pipeline_labels.name LIKE 'parallel/%' AND |
| _devlake_pipeline_labels.name in ?`, |
| runningParallelLabels, |
| ), |
| dal.Groupby("id"), |
| dal.Having("count(_devlake_pipeline_labels.name)=0"), |
| dal.Select("id"), |
| dal.Orderby("id ASC"), |
| dal.Limit(1), |
| ) |
| cronLocker.Unlock() |
| if err == nil { |
| // next pipeline found |
| break |
| } |
| if !db.IsErrorNotFound(err) { |
| // log unexpected err |
| globalPipelineLog.Error(err, "dequeue failed") |
| } |
| time.Sleep(time.Second) |
| } |
| |
| // mark the pipeline running |
| err = db.UpdateColumns(&models.Pipeline{}, []dal.DalSet{ |
| {ColumnName: "status", Value: models.TASK_RUNNING}, |
| {ColumnName: "message", Value: ""}, |
| {ColumnName: "began_at", Value: time.Now()}, |
| }, dal.Where("id = ?", dbPipeline.ID)) |
| if err != nil { |
| panic(err) |
| } |
| |
| // add pipelineParallelLabels to runningParallelLabels |
| var pipelineParallelLabels []string |
| err = fillPipelineDetail(dbPipeline) |
| if err != nil { |
| panic(err) |
| } |
| for _, dbLabel := range dbPipeline.Labels { |
| if strings.HasPrefix(dbLabel, `parallel/`) { |
| pipelineParallelLabels = append(pipelineParallelLabels, dbLabel) |
| } |
| } |
| runningParallelLabelLock.Lock() |
| runningParallelLabels = append(runningParallelLabels, pipelineParallelLabels...) |
| runningParallelLabelLock.Unlock() |
| |
| go func(pipelineId uint64, parallelLabels []string) { |
| defer sema.Release(1) |
| defer func() { |
| runningParallelLabelLock.Lock() |
| runningParallelLabels = utils.SliceRemove(runningParallelLabels, parallelLabels...) |
| runningParallelLabelLock.Unlock() |
| globalPipelineLog.Info("finish pipeline #%d, now runningParallelLabels is %s", pipelineId, runningParallelLabels) |
| }() |
| globalPipelineLog.Info("run pipeline, %d, now running runningParallelLabels are %s", pipelineId, runningParallelLabels) |
| err = runPipeline(pipelineId) |
| if err != nil { |
| globalPipelineLog.Error(err, "failed to run pipeline %d", pipelineId) |
| } |
| }(dbPipeline.ID, pipelineParallelLabels) |
| } |
| } |
| |
| func watchTemporalPipelines() { |
| ticker := time.NewTicker(3 * time.Second) |
| dc := converter.GetDefaultDataConverter() |
| go func() { |
| // run forever |
| for range ticker.C { |
| // load all running pipeline from database |
| runningDbPipelines := make([]models.Pipeline, 0) |
| err := db.All(&runningDbPipelines, dal.Where("status = ?", models.TASK_RUNNING)) |
| if err != nil { |
| panic(err) |
| } |
| // progressDetails will be only used in this goroutine now |
| // So it needn't lock and unlock now |
| progressDetails := make(map[uint64]*models.TaskProgressDetail) |
| // check their status against temporal |
| for _, rp := range runningDbPipelines { |
| workflowId := getTemporalWorkflowId(rp.ID) |
| desc, err := temporalClient.DescribeWorkflowExecution( |
| context.Background(), |
| workflowId, |
| "", |
| ) |
| if err != nil { |
| globalPipelineLog.Error(err, "failed to query workflow execution: %v", err) |
| continue |
| } |
| // workflow is terminated by outsider |
| s := desc.WorkflowExecutionInfo.Status |
| if s != v11.WORKFLOW_EXECUTION_STATUS_RUNNING { |
| rp.Status = models.TASK_COMPLETED |
| if s != v11.WORKFLOW_EXECUTION_STATUS_COMPLETED { |
| rp.Status = models.TASK_FAILED |
| // get error message |
| hisIter := temporalClient.GetWorkflowHistory( |
| context.Background(), |
| workflowId, |
| "", |
| false, |
| v11.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT, |
| ) |
| for hisIter.HasNext() { |
| his, err := hisIter.Next() |
| if err != nil { |
| globalPipelineLog.Error(err, "failed to get next from workflow history iterator: %v", err) |
| continue |
| } |
| rp.Message = fmt.Sprintf("temporal event type: %v", his.GetEventType()) |
| } |
| } |
| rp.FinishedAt = desc.WorkflowExecutionInfo.CloseTime |
| err = db.UpdateColumns(rp, []dal.DalSet{ |
| {ColumnName: "status", Value: rp.Status}, |
| {ColumnName: "message", Value: rp.Message}, |
| {ColumnName: "finished_at", Value: rp.FinishedAt}, |
| }) |
| if err != nil { |
| globalPipelineLog.Error(err, "failed to update db: %v", err) |
| } |
| continue |
| } |
| |
| // check pending activity |
| for _, activity := range desc.PendingActivities { |
| taskId, err := getTaskIdFromActivityId(activity.ActivityId) |
| if err != nil { |
| globalPipelineLog.Error(err, "unable to extract task id from activity id `%s`", activity.ActivityId) |
| continue |
| } |
| progressDetail := &models.TaskProgressDetail{} |
| progressDetails[taskId] = progressDetail |
| heartbeats := activity.GetHeartbeatDetails() |
| if heartbeats == nil { |
| continue |
| } |
| payloads := heartbeats.GetPayloads() |
| if len(payloads) == 0 { |
| return |
| } |
| lastPayload := payloads[len(payloads)-1] |
| if err := dc.FromPayload(lastPayload, progressDetail); err != nil { |
| globalPipelineLog.Error(err, "failed to unmarshal heartbeat payload: %v", err) |
| continue |
| } |
| } |
| } |
| runningTasks.setAll(progressDetails) |
| } |
| }() |
| } |
| |
| func getTemporalWorkflowId(pipelineId uint64) string { |
| return fmt.Sprintf("pipeline #%d", pipelineId) |
| } |
| |
| // NotifyExternal FIXME ... |
| func NotifyExternal(pipelineId uint64) errors.Error { |
| if notificationService == nil { |
| return nil |
| } |
| // send notification to an external web endpoint |
| pipeline, err := GetPipeline(pipelineId) |
| if err != nil { |
| return err |
| } |
| err = notificationService.PipelineStatusChanged(PipelineNotification{ |
| PipelineID: pipeline.ID, |
| CreatedAt: pipeline.CreatedAt, |
| UpdatedAt: pipeline.UpdatedAt, |
| BeganAt: pipeline.BeganAt, |
| FinishedAt: pipeline.FinishedAt, |
| Status: pipeline.Status, |
| }) |
| if err != nil { |
| globalPipelineLog.Error(err, "failed to send notification: %v", err) |
| return err |
| } |
| return nil |
| } |
| |
| // CancelPipeline FIXME ... |
| func CancelPipeline(pipelineId uint64) errors.Error { |
| // prevent RunPipelineInQueue from consuming pending pipelines |
| cronLocker.Lock() |
| defer cronLocker.Unlock() |
| pipeline := &models.Pipeline{} |
| err := db.First(pipeline, dal.Where("id = ?", pipelineId)) |
| if err != nil { |
| return errors.BadInput.New("pipeline not found") |
| } |
| if pipeline.Status == models.TASK_CREATED || pipeline.Status == models.TASK_RERUN { |
| pipeline.Status = models.TASK_CANCELLED |
| err = db.Update(pipeline) |
| if err != nil { |
| return errors.Default.Wrap(err, "faile to update pipeline") |
| } |
| // now, with RunPipelineInQueue being block and target pipeline got updated |
| // we should update the related tasks as well |
| err = db.UpdateColumn( |
| &models.Task{}, |
| "status", models.TASK_CANCELLED, |
| dal.Where("pipeline_id = ?", pipelineId), |
| ) |
| if err != nil { |
| return errors.Default.Wrap(err, "faile to update pipeline tasks") |
| } |
| // the target pipeline is pending, no running, no need to perform the actual cancel operation |
| return nil |
| } |
| if temporalClient != nil { |
| return errors.Convert(temporalClient.CancelWorkflow(context.Background(), getTemporalWorkflowId(pipelineId), "")) |
| } |
| pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId, Pending: 1, Pagination: Pagination{PageSize: -1}}) |
| if err != nil { |
| return errors.Convert(err) |
| } |
| if count == 0 { |
| return nil |
| } |
| for _, pendingTask := range pendingTasks { |
| _ = CancelTask(pendingTask.ID) |
| } |
| return errors.Convert(err) |
| } |
| |
| // getPipelineLogsPath gets the logs directory of this pipeline |
| func getPipelineLogsPath(pipeline *models.Pipeline) (string, errors.Error) { |
| pipelineLog := GetPipelineLogger(pipeline) |
| path := filepath.Dir(pipelineLog.GetConfig().Path) |
| _, err := os.Stat(path) |
| if err == nil { |
| return path, nil |
| } |
| if os.IsNotExist(err) { |
| return "", errors.NotFound.Wrap(err, fmt.Sprintf("logs for pipeline #%d not found", pipeline.ID)) |
| } |
| return "", errors.Default.Wrap(err, fmt.Sprintf("error validating logs path for pipeline #%d", pipeline.ID)) |
| } |
| |
| // RerunPipeline would rerun all failed tasks or specified task |
| func RerunPipeline(pipelineId uint64, task *models.Task) ([]*models.Task, errors.Error) { |
| // prevent pipeline executor from doing anything that might jeopardize the integrity |
| cronLocker.Lock() |
| defer cronLocker.Unlock() |
| |
| // load the pipeline |
| pipeline, err := GetPipeline(pipelineId) |
| if err != nil { |
| return nil, err |
| } |
| |
| // verify the status |
| if pipeline.Status == models.TASK_RUNNING { |
| return nil, errors.BadInput.New("pipeline is running") |
| } |
| if pipeline.Status == models.TASK_CREATED || pipeline.Status == models.TASK_RERUN { |
| return nil, errors.BadInput.New("pipeline is waiting to run") |
| } |
| |
| // determine which tasks to rerun |
| var failedTasks []*models.Task |
| if task != nil { |
| if task.PipelineId != pipelineId { |
| return nil, errors.BadInput.New("the task ID and pipeline ID doesn't match") |
| } |
| failedTasks = append(failedTasks, task) |
| } else { |
| tasks, err := GetTasksWithLastStatus(pipelineId) |
| if err != nil { |
| return nil, errors.Default.Wrap(err, "error getting tasks") |
| } |
| for _, t := range tasks { |
| if t.Status != models.TASK_COMPLETED { |
| failedTasks = append(failedTasks, t) |
| } |
| } |
| } |
| |
| // no tasks to rerun |
| if len(failedTasks) == 0 { |
| return nil, errors.BadInput.New("no tasks to be re-ran") |
| } |
| |
| // create new tasks |
| // TODO: this is better to be wrapped inside a transaction |
| rerunTasks := []*models.Task{} |
| for _, t := range failedTasks { |
| // mark previous task failed |
| t.Status = models.TASK_FAILED |
| err := db.UpdateColumn(t, "status", models.TASK_FAILED) |
| if err != nil { |
| return nil, err |
| } |
| // create new task |
| subtasks, err := t.GetSubTasks() |
| if err != nil { |
| return nil, err |
| } |
| options, err := t.GetOptions() |
| if err != nil { |
| return nil, err |
| } |
| rerunTask, err := CreateTask(&models.NewTask{ |
| PipelineTask: &plugin.PipelineTask{ |
| Plugin: t.Plugin, |
| Subtasks: subtasks, |
| Options: options, |
| }, |
| PipelineId: t.PipelineId, |
| PipelineRow: t.PipelineRow, |
| PipelineCol: t.PipelineCol, |
| IsRerun: true, |
| }) |
| if err != nil { |
| return nil, err |
| } |
| // append to result |
| rerunTasks = append(rerunTasks, rerunTask) |
| } |
| |
| // mark pipline rerun |
| err = db.UpdateColumn(&models.Pipeline{}, |
| "status", models.TASK_RERUN, |
| dal.Where("id = ?", pipelineId), |
| ) |
| if err != nil { |
| return nil, err |
| } |
| return rerunTasks, nil |
| } |