blob: c230980b5bca1ef2fafccef437c01565c2c16f96 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package services
import (
"context"
"encoding/json"
"fmt"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/log"
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/impls/logruslog"
"regexp"
"strings"
)
var taskLog = logruslog.Global.Nested("task service")
var activityPattern = regexp.MustCompile(`task #(\d+)`)
// TaskQuery FIXME .
type TaskQuery struct {
Pagination
Status string `form:"status"`
Plugin string `form:"plugin"`
PipelineId uint64 `form:"pipelineId" uri:"pipelineId"`
Pending int `form:"pending"`
}
// CreateTask creates a new task
func CreateTask(newTask *models.NewTask) (*models.Task, errors.Error) {
b, err := json.Marshal(newTask.Options)
if err != nil {
return nil, errors.Convert(err)
}
s, err := json.Marshal(newTask.Subtasks)
if err != nil {
return nil, errors.Convert(err)
}
task := &models.Task{
Plugin: newTask.Plugin,
Subtasks: s,
Options: string(b),
Status: models.TASK_CREATED,
Message: "",
PipelineId: newTask.PipelineId,
PipelineRow: newTask.PipelineRow,
PipelineCol: newTask.PipelineCol,
}
if newTask.IsRerun {
task.Status = models.TASK_RERUN
}
err = db.Create(task)
if err != nil {
taskLog.Error(err, "save task failed")
return nil, errors.Internal.Wrap(err, "save task failed")
}
return task, nil
}
// GetTasks returns paginated tasks that match the given query
func GetTasks(query *TaskQuery) ([]*models.Task, int64, errors.Error) {
// verify query
if err := VerifyStruct(query); err != nil {
return nil, 0, err
}
// construct common query clauses
clauses := []dal.Clause{dal.From(&models.Task{})}
if query.Status != "" {
clauses = append(clauses, dal.Where("status = ?", query.Status))
}
if query.Plugin != "" {
clauses = append(clauses, dal.Where("plugin = ?", query.Plugin))
}
if query.PipelineId > 0 {
clauses = append(clauses, dal.Where("pipeline_id = ?", query.PipelineId))
}
if query.Pending > 0 {
clauses = append(clauses, dal.Where("finished_at is null"))
}
// count total records
count, err := db.Count(clauses...)
if err != nil {
return nil, 0, err
}
// load paginated records from db
clauses = append(clauses,
dal.Orderby("id DESC"),
dal.Offset(query.GetSkip()),
dal.Limit(query.GetPageSizeOr(10000)),
)
tasks := make([]*models.Task, 0)
err = db.All(&tasks, clauses...)
if err != nil {
return nil, count, err
}
// fill running information
runningTasks.FillProgressDetailToTasks(tasks)
return tasks, count, nil
}
// GetTasksWithLastStatus returns task list of the pipeline, only the most recently tasks would be returned
// TODO: adopts GetLatestTasksOfPipeline
func GetTasksWithLastStatus(pipelineId uint64) ([]*models.Task, errors.Error) {
var tasks []*models.Task
err := db.All(&tasks, dal.Where("pipeline_id = ?", pipelineId), dal.Orderby("id DESC"))
if err != nil {
return nil, err
}
taskIds := make(map[int64]struct{})
var result []*models.Task
var maxRow, maxCol int
for _, task := range tasks {
if task.PipelineRow > maxRow {
maxRow = task.PipelineRow
}
if task.PipelineCol > maxCol {
maxCol = task.PipelineCol
}
}
for _, task := range tasks {
index := int64(task.PipelineRow)*int64(maxCol) + int64(task.PipelineCol)
if _, ok := taskIds[index]; !ok {
taskIds[index] = struct{}{}
result = append(result, task)
}
}
runningTasks.FillProgressDetailToTasks(result)
return result, nil
}
// GetTask FIXME ...
func GetTask(taskId uint64) (*models.Task, errors.Error) {
task := &models.Task{}
err := db.First(task, dal.Where("id = ?", taskId))
if err != nil {
if db.IsErrorNotFound(err) {
return nil, errors.NotFound.New("task not found")
}
return nil, errors.Internal.Wrap(err, "error getting the task from database")
}
return task, nil
}
// CancelTask FIXME ...
func CancelTask(taskId uint64) errors.Error {
cancel, err := runningTasks.Remove(taskId)
if err != nil {
return err
}
cancel()
return nil
}
// RunTasksStandalone run tasks in parallel
func RunTasksStandalone(parentLogger log.Logger, taskIds []uint64) errors.Error {
if len(taskIds) == 0 {
return nil
}
results := make(chan error)
for _, taskId := range taskIds {
go func(id uint64) {
taskLog.Info("run task #%d in background ", id)
var err errors.Error
taskErr := runTaskStandalone(parentLogger, id)
if taskErr != nil {
err = errors.Default.Wrap(taskErr, fmt.Sprintf("Error running task %d.", id))
}
results <- err
}(taskId)
}
errs := make([]error, 0)
var err error
finished := 0
for err = range results {
if err != nil {
taskLog.Error(err, "task failed")
errs = append(errs, err)
}
finished++
if finished == len(taskIds) {
close(results)
}
}
if len(errs) > 0 {
var sb strings.Builder
for _, e := range errs {
_, _ = sb.WriteString(e.Error())
_, _ = sb.WriteString("\n")
if errors.Is(e, context.Canceled) {
parentLogger.Info("task canceled")
return errors.Convert(e)
}
}
err = errors.Default.New(sb.String())
}
return errors.Convert(err)
}
// RerunTask reruns specified task
func RerunTask(taskId uint64) (*models.Task, errors.Error) {
task, err := GetTask(taskId)
if err != nil {
return nil, err
}
rerunTasks, err := RerunPipeline(task.PipelineId, task)
if err != nil {
return nil, err
}
return rerunTasks[0], nil
}