blob: a4c21ad8e5535e3728ae3f78517f18b8953b263d [file] [log] [blame]
package services
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/merico-dev/lake/errors"
"github.com/merico-dev/lake/logger"
"github.com/merico-dev/lake/models"
"github.com/merico-dev/lake/runner"
"github.com/merico-dev/lake/worker/app"
v11 "go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
)
var notificationService *NotificationService
var temporalClient client.Client
var pipelineLog = logger.Global.Nested("pipeline service")
type PipelineQuery struct {
Status string `form:"status"`
Pending int `form:"pending"`
Page int `form:"page"`
PageSize int `form:"pageSize"`
}
func pipelineServiceInit() {
// notification
var notificationEndpoint = cfg.GetString("NOTIFICATION_ENDPOINT")
var notificationSecret = cfg.GetString("NOTIFICATION_SECRET")
if strings.TrimSpace(notificationEndpoint) != "" {
notificationService = NewNotificationService(notificationEndpoint, notificationSecret)
}
// temporal client
var temporalUrl = cfg.GetString("TEMPORAL_URL")
if temporalUrl != "" {
// TODO: logger
var err error
temporalClient, err = client.NewClient(client.Options{
HostPort: temporalUrl,
})
if err != nil {
panic(err)
}
watchTemporalPipelines()
} else {
// standalone mode: reset pipeline status
db.Model(&models.Pipeline{}).Where("status <> ?", models.TASK_COMPLETED).Update("status", models.TASK_FAILED)
db.Model(&models.Task{}).Where("status <> ?", models.TASK_COMPLETED).Update("status", models.TASK_FAILED)
}
err := ReloadBlueprints(cronManager)
if err != nil {
panic(err)
}
}
func CreatePipeline(newPipeline *models.NewPipeline) (*models.Pipeline, error) {
// create pipeline object from posted data
pipeline := &models.Pipeline{
Name: newPipeline.Name,
FinishedTasks: 0,
Status: models.TASK_CREATED,
Message: "",
SpentSeconds: 0,
}
if newPipeline.BlueprintId != 0 {
pipeline.BlueprintId = newPipeline.BlueprintId
}
// save pipeline to database
err := db.Create(&pipeline).Error
if err != nil {
pipelineLog.Error("create pipline failed: %w", err)
return nil, errors.InternalError
}
// create tasks accordingly
for i := range newPipeline.Tasks {
for j := range newPipeline.Tasks[i] {
newTask := newPipeline.Tasks[i][j]
newTask.PipelineId = pipeline.ID
newTask.PipelineRow = i + 1
newTask.PipelineCol = j + 1
_, err := CreateTask(newTask)
if err != nil {
pipelineLog.Error("create task for pipeline failed: %w", err)
return nil, err
}
// sync task state back to pipeline
pipeline.TotalTasks += 1
}
}
if err != nil {
pipelineLog.Error("save tasks for pipeline failed: %w", err)
return nil, errors.InternalError
}
if pipeline.TotalTasks == 0 {
return nil, fmt.Errorf("no task to run")
}
// update tasks state
pipeline.Tasks, err = json.Marshal(newPipeline.Tasks)
if err != nil {
return nil, err
}
err = db.Model(pipeline).Updates(map[string]interface{}{
"total_tasks": pipeline.TotalTasks,
"tasks": pipeline.Tasks,
}).Error
if err != nil {
pipelineLog.Error("update pipline state failed: %w", err)
return nil, errors.InternalError
}
return pipeline, nil
}
func GetPipelines(query *PipelineQuery) ([]*models.Pipeline, int64, error) {
pipelines := make([]*models.Pipeline, 0)
db := db.Model(pipelines).Order("id DESC")
if query.Status != "" {
db = db.Where("status = ?", query.Status)
}
if query.Pending > 0 {
db = db.Where("finished_at is null")
}
var count int64
err := db.Count(&count).Error
if err != nil {
return nil, 0, err
}
if query.Page > 0 && query.PageSize > 0 {
offset := query.PageSize * (query.Page - 1)
db = db.Limit(query.PageSize).Offset(offset)
}
err = db.Find(&pipelines).Error
if err != nil {
return nil, count, err
}
return pipelines, count, nil
}
func GetPipeline(pipelineId uint64) (*models.Pipeline, error) {
pipeline := &models.Pipeline{}
err := db.Find(pipeline, pipelineId).Error
if err != nil {
return nil, err
}
return pipeline, nil
}
func RunPipeline(pipelineId uint64) error {
var err error
// run
if temporalClient != nil {
err = runPipelineViaTemporal(pipelineId)
} else {
err = runPipelineStandalone(pipelineId)
}
// load
pipeline, e := GetPipeline(pipelineId)
if e != nil {
return err
}
// finished, update database
finishedAt := time.Now()
pipeline.FinishedAt = &finishedAt
pipeline.SpentSeconds = int(finishedAt.Unix() - pipeline.BeganAt.Unix())
if err != nil {
pipeline.Status = models.TASK_FAILED
pipeline.Message = err.Error()
} else {
pipeline.Status = models.TASK_COMPLETED
pipeline.Message = ""
}
dbe := db.Model(pipeline).Select("finished_at", "spent_seconds", "status", "message").Updates(pipeline).Error
if dbe != nil {
pipelineLog.Error("update pipeline state failed: %w", dbe)
return dbe
}
// notify external webhook
return NotifyExternal(pipelineId)
}
func getTemporalWorkflowId(pipelineId uint64) string {
return fmt.Sprintf("pipeline #%d", pipelineId)
}
func runPipelineViaTemporal(pipelineId uint64) error {
workflowOpts := client.StartWorkflowOptions{
ID: getTemporalWorkflowId(pipelineId),
TaskQueue: cfg.GetString("TEMPORAL_TASK_QUEUE"),
}
// send only the very basis data
configJson, err := json.Marshal(cfg.AllSettings())
if err != nil {
return err
}
pipelineLog.Info("enqueue pipeline #%d into temporal task queue", pipelineId)
workflow, err := temporalClient.ExecuteWorkflow(
context.Background(),
workflowOpts,
app.DevLakePipelineWorkflow,
configJson,
pipelineId,
)
if err != nil {
pipelineLog.Error("failed to enqueue pipeline #%d into temporal", pipelineId)
return err
}
err = workflow.Get(context.Background(), nil)
if err != nil {
pipelineLog.Info("failed to execute pipeline #%d via temporal: %w", pipelineId, err)
}
pipelineLog.Info("pipeline #%d finished by temporal", pipelineId)
return err
}
func watchTemporalPipelines() {
ticker := time.NewTicker(3 * time.Second)
dc := converter.GetDefaultDataConverter()
go func() {
// run forever
for range ticker.C {
// load all running pipeline from database
runningPipelines := make([]models.Pipeline, 0)
err := db.Find(&runningPipelines, "status = ?", models.TASK_RUNNING).Error
if err != nil {
panic(err)
}
progressDetails := make(map[uint64]*models.TaskProgressDetail)
// check their status against temporal
for _, rp := range runningPipelines {
workflowId := getTemporalWorkflowId(rp.ID)
desc, err := temporalClient.DescribeWorkflowExecution(
context.Background(),
workflowId,
"",
)
if err != nil {
pipelineLog.Error("failed to query workflow execution: %w", err)
continue
}
// workflow is terminated by outsider
s := desc.WorkflowExecutionInfo.Status
if s != v11.WORKFLOW_EXECUTION_STATUS_RUNNING {
rp.Status = models.TASK_COMPLETED
if s != v11.WORKFLOW_EXECUTION_STATUS_COMPLETED {
rp.Status = models.TASK_FAILED
// get error message
hisIter := temporalClient.GetWorkflowHistory(
context.Background(),
workflowId,
"",
false,
v11.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT,
)
for hisIter.HasNext() {
his, err := hisIter.Next()
if err != nil {
pipelineLog.Error("failed to get next from workflow history iterator: %w", err)
continue
}
rp.Message = fmt.Sprintf("temporal event type: %v", his.GetEventType())
}
}
rp.FinishedAt = desc.WorkflowExecutionInfo.CloseTime
err = db.Model(rp).Updates(map[string]interface{}{
"status": rp.Status,
"message": rp.Message,
"finished_at": rp.FinishedAt,
}).Error
if err != nil {
pipelineLog.Error("failed to update db: %w", err)
}
continue
}
// check pending activity
for _, activity := range desc.PendingActivities {
taskId, err := getTaskIdFromActivityId(activity.ActivityId)
if err != nil {
pipelineLog.Error("unable to extract task id from activity id `%s`", activity.ActivityId)
continue
}
progressDetail := &models.TaskProgressDetail{}
progressDetails[taskId] = progressDetail
heartbeats := activity.GetHeartbeatDetails()
if heartbeats == nil {
continue
}
payloads := heartbeats.GetPayloads()
if len(payloads) == 0 {
return
}
lastPayload := payloads[len(payloads)-1]
err = dc.FromPayload(lastPayload, progressDetail)
if err != nil {
pipelineLog.Error("failed to unmarshal heartbeat payload: %w", err)
continue
}
}
}
runningTasks.setAll(progressDetails)
}
}()
}
func runPipelineStandalone(pipelineId uint64) error {
return runner.RunPipeline(
cfg,
pipelineLog.Nested(fmt.Sprintf("pipeline #%d", pipelineId)),
db,
pipelineId,
runTasksStandalone,
)
}
func NotifyExternal(pipelineId uint64) error {
if notificationService == nil {
return nil
}
// send notification to an external web endpoint
pipeline, err := GetPipeline(pipelineId)
if err != nil {
return err
}
err = notificationService.PipelineStatusChanged(PipelineNotification{
PipelineID: pipeline.ID,
CreatedAt: pipeline.CreatedAt,
UpdatedAt: pipeline.UpdatedAt,
BeganAt: pipeline.BeganAt,
FinishedAt: pipeline.FinishedAt,
Status: pipeline.Status,
})
if err != nil {
pipelineLog.Error("failed to send notification: %w", err)
return err
}
return nil
}
func CancelPipeline(pipelineId uint64) error {
if temporalClient != nil {
return temporalClient.CancelWorkflow(context.Background(), getTemporalWorkflowId(pipelineId), "")
}
pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId, Pending: 1, PageSize: -1})
if err != nil {
return err
}
if count == 0 {
return nil
}
for _, pendingTask := range pendingTasks {
_ = CancelTask(pendingTask.ID)
}
return err
}