| /* |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package services |
| |
| import ( |
| "context" |
| "fmt" |
| "math" |
| "strings" |
| |
| "github.com/apache/incubator-devlake/core/dal" |
| "github.com/apache/incubator-devlake/core/errors" |
| "github.com/apache/incubator-devlake/core/log" |
| "github.com/apache/incubator-devlake/core/models" |
| "github.com/apache/incubator-devlake/impls/logruslog" |
| ) |
| |
| var taskLog = logruslog.Global.Nested("task service") |
| |
| // TaskQuery FIXME . |
| type TaskQuery struct { |
| Pagination |
| Status string `form:"status"` |
| Plugin string `form:"plugin"` |
| PipelineId uint64 `form:"pipelineId" uri:"pipelineId"` |
| Pending int `form:"pending"` |
| } |
| |
| func createTask(newTask *models.NewTask, tx dal.Transaction) (*models.Task, errors.Error) { |
| task := &models.Task{ |
| Plugin: newTask.Plugin, |
| Subtasks: newTask.Subtasks, |
| Options: newTask.Options, |
| Status: models.TASK_CREATED, |
| Message: "", |
| PipelineId: newTask.PipelineId, |
| PipelineRow: newTask.PipelineRow, |
| PipelineCol: newTask.PipelineCol, |
| } |
| if newTask.IsRerun { |
| task.Status = models.TASK_RERUN |
| } |
| err := tx.Create(task) |
| if err != nil { |
| taskLog.Error(err, "save task failed") |
| return nil, errors.Internal.Wrap(err, "save task failed") |
| } |
| return task, nil |
| } |
| |
| // GetTasks returns paginated tasks that match the given query |
| func GetTasks(query *TaskQuery) ([]*models.Task, int64, errors.Error) { |
| // verify query |
| if err := VerifyStruct(query); err != nil { |
| return nil, 0, err |
| } |
| |
| // construct common query clauses |
| clauses := []dal.Clause{dal.From(&models.Task{})} |
| if query.Status != "" { |
| clauses = append(clauses, dal.Where("status = ?", query.Status)) |
| } |
| if query.Plugin != "" { |
| clauses = append(clauses, dal.Where("plugin = ?", query.Plugin)) |
| } |
| if query.PipelineId > 0 { |
| clauses = append(clauses, dal.Where("pipeline_id = ?", query.PipelineId)) |
| } |
| if query.Pending > 0 { |
| clauses = append(clauses, dal.Where("finished_at is null")) |
| } |
| |
| // count total records |
| count, err := db.Count(clauses...) |
| if err != nil { |
| return nil, 0, err |
| } |
| |
| // load paginated records from db |
| clauses = append(clauses, |
| dal.Orderby("id DESC"), |
| dal.Offset(query.GetSkip()), |
| dal.Limit(query.GetPageSizeOr(10000)), |
| ) |
| tasks := make([]*models.Task, 0) |
| err = db.All(&tasks, clauses...) |
| if err != nil { |
| return nil, count, err |
| } |
| |
| // fill running information |
| runningTasks.FillProgressDetailToTasks(tasks) |
| |
| return tasks, count, nil |
| } |
| |
| // GetTasksWithLastStatus returns task list of the pipeline, only the most recently tasks would be returned |
| // TODO: adopts GetLatestTasksOfPipeline |
| func GetTasksWithLastStatus(pipelineId uint64, shouldSanitize bool, tx dal.Dal) ([]*models.Task, errors.Error) { |
| if tx == nil { |
| tx = db |
| } |
| var tasks []*models.Task |
| err := tx.All(&tasks, dal.Where("pipeline_id = ?", pipelineId), dal.Orderby("id DESC")) |
| if err != nil { |
| return nil, err |
| } |
| taskIds := make(map[int64]struct{}) |
| var result []*models.Task |
| var maxRow, maxCol int |
| for _, task := range tasks { |
| if task.PipelineRow > maxRow { |
| maxRow = task.PipelineRow |
| } |
| if task.PipelineCol > maxCol { |
| maxCol = task.PipelineCol |
| } |
| } |
| |
| for _, task := range tasks { |
| index := int64(task.PipelineRow)*int64(maxCol) + int64(task.PipelineCol) |
| if shouldSanitize { |
| taskOption, err := SanitizePluginOption(task.Plugin, task.Options) |
| if err != nil { |
| return nil, errors.Convert(err) |
| } |
| task.Options = taskOption |
| } |
| if _, ok := taskIds[index]; !ok { |
| taskIds[index] = struct{}{} |
| result = append(result, task) |
| } |
| } |
| |
| runningTasks.FillProgressDetailToTasks(result) |
| return result, nil |
| } |
| |
| // GetTask FIXME ... |
| func GetTask(taskId uint64) (*models.Task, errors.Error) { |
| task := &models.Task{} |
| err := db.First(task, dal.Where("id = ?", taskId)) |
| if err != nil { |
| if db.IsErrorNotFound(err) { |
| return nil, errors.NotFound.New("task not found") |
| } |
| return nil, errors.Internal.Wrap(err, "error getting the task from database") |
| } |
| return task, nil |
| } |
| |
| // CancelTask FIXME ... |
| func CancelTask(taskId uint64) errors.Error { |
| cancel, err := runningTasks.Remove(taskId) |
| if err != nil { |
| return err |
| } |
| cancel() |
| return nil |
| } |
| |
| // RunTasksStandalone run tasks in parallel |
| func RunTasksStandalone(parentLogger log.Logger, taskIds []uint64) errors.Error { |
| if len(taskIds) == 0 { |
| return nil |
| } |
| results := make(chan error) |
| for _, taskId := range taskIds { |
| go func(id uint64) { |
| taskLog.Info("run task #%d in background ", id) |
| var err errors.Error |
| taskErr := runTaskStandalone(parentLogger, id) |
| if taskErr != nil { |
| err = errors.Default.Wrap(taskErr, fmt.Sprintf("Error running task %d.", id)) |
| } |
| results <- err |
| }(taskId) |
| } |
| errs := make([]error, 0) |
| var err error |
| finished := 0 |
| for err = range results { |
| if err != nil { |
| taskLog.Error(err, "task failed") |
| errs = append(errs, err) |
| } |
| finished++ |
| if finished == len(taskIds) { |
| close(results) |
| } |
| } |
| if len(errs) > 0 { |
| var sb strings.Builder |
| for _, e := range errs { |
| _, _ = sb.WriteString(e.Error()) |
| _, _ = sb.WriteString("\n") |
| if errors.Is(e, context.Canceled) { |
| parentLogger.Info("task canceled") |
| return errors.Convert(e) |
| } |
| } |
| err = errors.Default.New(sb.String()) |
| } |
| return errors.Convert(err) |
| } |
| |
| // RerunTask reruns specified task |
| func RerunTask(taskId uint64) (*models.Task, errors.Error) { |
| task, err := GetTask(taskId) |
| if err != nil { |
| return nil, err |
| } |
| rerunTasks, err := RerunPipeline(task.PipelineId, task) |
| if err != nil { |
| return nil, err |
| } |
| rerunTask := rerunTasks[0] |
| taskOption, sanitizePluginOptionErr := SanitizePluginOption(rerunTask.Plugin, rerunTask.Options) |
| if sanitizePluginOptionErr != nil { |
| return nil, errors.Convert(err) |
| } |
| rerunTask.Options = taskOption |
| return rerunTask, nil |
| } |
| |
| // GetSubTasksInfo returns subtask list of the pipeline, only the most recently subtasks would be returned |
| func GetSubTasksInfo(pipelineId uint64, shouldSanitize bool, tx dal.Dal) (*models.SubTasksOuput, errors.Error) { |
| if tx == nil { |
| tx = db |
| } |
| var tasks []*models.Task |
| err := tx.All(&tasks, dal.Where("pipeline_id = ?", pipelineId), dal.Orderby("id")) |
| if err != nil { |
| return nil, err |
| } |
| |
| lastTasks := filterTasksWithLastStatus(tasks) |
| var subtasksInfo []models.SubtasksInfo |
| var totalSubtasksCount int64 |
| var totalFinishedSubTasksCount int64 |
| var count int64 |
| var status []string |
| for _, task := range lastTasks { |
| if task.Plugin == "org" { |
| continue |
| } |
| subTaskResult := models.SubtasksInfo{ |
| ID: task.ID, |
| PipelineID: task.PipelineId, |
| CreatedAt: task.CreatedAt, |
| UpdatedAt: task.UpdatedAt, |
| BeganAt: task.BeganAt, |
| FinishedAt: task.FinishedAt, |
| Plugin: task.Plugin, |
| Status: task.Status, |
| Message: task.Message, |
| ErrorName: task.ErrorName, |
| SpentSeconds: task.SpentSeconds, |
| } |
| if shouldSanitize { |
| taskOption, err := SanitizePluginOption(task.Plugin, task.Options) |
| if err != nil { |
| return nil, errors.Convert(err) |
| } |
| subTaskResult.Options = taskOption |
| } |
| |
| subtasks := []*models.Subtask{} |
| err = tx.All(&subtasks, dal.Where("task_id = ?", task.ID)) |
| if err != nil { |
| return nil, err |
| } |
| for _, subtask := range subtasks { |
| t := &models.SubtaskDetails{ |
| ID: subtask.ID, |
| CreatedAt: subtask.CreatedAt, |
| UpdatedAt: subtask.UpdatedAt, |
| TaskID: subtask.TaskID, |
| Name: subtask.Name, |
| Number: subtask.Number, |
| BeganAt: subtask.BeganAt, |
| FinishedAt: subtask.FinishedAt, |
| SpentSeconds: subtask.SpentSeconds, |
| FinishedRecords: subtask.FinishedRecords, |
| Sequence: subtask.Sequence, |
| IsCollector: subtask.IsCollector, |
| IsFailed: subtask.IsFailed, |
| Message: subtask.Message, |
| } |
| subTaskResult.SubtaskDetails = append(subTaskResult.SubtaskDetails, t) |
| } |
| subtasksInfo = append(subtasksInfo, subTaskResult) |
| |
| collectSubtasksCount := errors.Must1(tx.Count(dal.From("_devlake_subtasks"), dal.Where("task_id = ? and is_collector = 1", task.ID))) |
| totalSubtasksCount += collectSubtasksCount |
| finishedSubTasksCount := errors.Must1(tx.Count(dal.From("_devlake_subtasks"), dal.Where("task_id = ? and is_collector = 1 and finished_at is not null", task.ID))) |
| totalFinishedSubTasksCount += finishedSubTasksCount |
| count++ |
| |
| status = append(status, task.Status) |
| } |
| |
| subTasksOuput := &models.SubTasksOuput{} |
| |
| subTasksOuput.SubtasksInfo = subtasksInfo |
| subTasksOuput.Count = totalSubtasksCount |
| completionRateFloat := float64(totalFinishedSubTasksCount) / float64(totalSubtasksCount) |
| roundedCompletionRate := math.Round(completionRateFloat*100) / 100 |
| subTasksOuput.CompletionRate = roundedCompletionRate |
| |
| subTasksOuput.Status = getTaskStatus(status) |
| subTasksOuput.Count = count |
| |
| return subTasksOuput, nil |
| } |
| |
| // filterTasksWithLastStatus returns the latest task for each plugin |
| func filterTasksWithLastStatus(tasks []*models.Task) []*models.Task { |
| taskMap := make(map[string]*models.Task) |
| for _, task := range tasks { |
| key := fmt.Sprintf("%d-%d-%d", task.PipelineId, task.PipelineRow, task.PipelineCol) |
| if existingTask, ok := taskMap[key]; ok { |
| if task.BeganAt != nil && (existingTask.BeganAt == nil || task.BeganAt.After(*existingTask.BeganAt)) { |
| taskMap[key] = task |
| } |
| } else { |
| taskMap[key] = task |
| } |
| } |
| |
| var filteredTasks []*models.Task |
| for _, task := range tasks { |
| key := fmt.Sprintf("%d-%d-%d", task.PipelineId, task.PipelineRow, task.PipelineCol) |
| if filteredTask, ok := taskMap[key]; ok { |
| filteredTasks = append(filteredTasks, filteredTask) |
| } |
| } |
| |
| return filteredTasks |
| } |
| |
| func getTaskStatus(statuses []string) string { |
| var status string |
| if len(statuses) == 0 { |
| return status |
| } |
| |
| failedCount := 0 |
| completedCount := 0 |
| for _, s := range statuses { |
| if s == models.TASK_FAILED { |
| failedCount++ |
| } else if s == models.TASK_COMPLETED { |
| completedCount++ |
| } |
| } |
| if failedCount > 0 && completedCount > 0 { |
| status = "TASK_PARTIAL" |
| } else if failedCount == len(statuses) { |
| status = models.TASK_FAILED |
| } else if completedCount == len(statuses) { |
| status = models.TASK_COMPLETED |
| } else { |
| status = models.TASK_RUNNING |
| } |
| |
| return status |
| } |