blob: 835a07bc707a3ffc2bc5b4f0506a01689f25a8c9 [file] [log] [blame]
package services
import (
"encoding/json"
"fmt"
"github.com/merico-dev/lake/errors"
"github.com/merico-dev/lake/logger"
"github.com/merico-dev/lake/models"
"github.com/merico-dev/lake/plugins"
)
const (
TASK_CREATED = "TASK_CREATED"
TASK_COMPLETED = "TASK_COMPLETED"
TASK_FAILED = "TASK_FAILED"
)
type NewTask struct {
// Plugin name
Plugin string `json:"plugin" binding:"required"`
// Options for the plugin task to be triggered
Options map[string]interface{} `json:"options" binding:"required"`
}
func init() {
models.Db.Model(&models.Task{}).Where("status != ?", TASK_COMPLETED).Update("status", TASK_FAILED)
}
func CreateTask(data NewTask) (*models.Task, error) {
b, err := json.Marshal(data.Options)
if err != nil {
return nil, err
}
task := models.Task{
Plugin: data.Plugin,
Options: b,
Status: TASK_CREATED,
Message: "",
}
err = models.Db.Save(&task).Error
if err != nil {
logger.Error("Database error", err)
return nil, errors.InternalError
}
// trigger plugins
data.Options["ID"] = task.ID
go func() {
progress := make(chan float32)
go func() {
err = plugins.RunPlugin(task.Plugin, data.Options, progress)
if err != nil {
logger.Error("Task error", err)
task.Status = TASK_FAILED
task.Message = err.Error()
}
err := models.Db.Save(&task).Error
if err != nil {
logger.Error("Database error", err)
}
}()
for p := range progress {
fmt.Printf("running plugin %v, progress: %v\n", task.Plugin, p*100)
task.Progress = p
models.Db.Save(&task)
}
task.Status = TASK_COMPLETED
err := models.Db.Save(&task).Error
if err != nil {
logger.Error("Database error", err)
}
}()
return &task, nil
}
func GetTasks(status string) ([]models.Task, error) {
db := models.Db
if status != "" {
db = db.Where("status = ?", status)
}
tasks := make([]models.Task, 0)
err := db.Find(&tasks).Error
if err != nil {
return nil, err
}
return tasks, nil
}