blob: 73c3dbd0bddd91993d113f56ba45770fc83d3641 [file] [log] [blame]
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package services
import (
v11 ""
var notificationService *NotificationService
var temporalClient client.Client
var globalPipelineLog = logruslog.Global.Nested("pipeline service")
// PipelineQuery is a query for GetPipelines
type PipelineQuery struct {
Status string `form:"status"`
Pending int `form:"pending"`
BlueprintId uint64 `uri:"blueprintId" form:"blueprint_id"`
Label string `form:"label"`
func pipelineServiceInit() {
// notification
var notificationEndpoint = cfg.GetString("NOTIFICATION_ENDPOINT")
var notificationSecret = cfg.GetString("NOTIFICATION_SECRET")
if strings.TrimSpace(notificationEndpoint) != "" {
notificationService = NewNotificationService(notificationEndpoint, notificationSecret)
// temporal client
var temporalUrl = cfg.GetString("TEMPORAL_URL")
if temporalUrl != "" {
// TODO: logger
var err error
temporalClient, err = client.NewClient(client.Options{
HostPort: temporalUrl,
if err != nil {
} else {
// standalone mode: reset pipeline status
errMsg := "The process was terminated unexpectedly"
err := db.UpdateColumns(
{ColumnName: "status", Value: models.TASK_FAILED},
{ColumnName: "message", Value: errMsg},
dal.Where("status = ?", models.TASK_RUNNING),
if err != nil {
err = db.UpdateColumns(
{ColumnName: "status", Value: models.TASK_FAILED},
{ColumnName: "message", Value: errMsg},
dal.Where("status = ?", models.TASK_RUNNING),
if err != nil {
err := ReloadBlueprints(cronManager)
if err != nil {
var pipelineMaxParallel = cfg.GetInt64("PIPELINE_MAX_PARALLEL")
if pipelineMaxParallel < 0 {
panic(errors.BadInput.New(`PIPELINE_MAX_PARALLEL should be a positive integer`))
if pipelineMaxParallel == 0 {
globalPipelineLog.Warn(nil, `pipelineMaxParallel=0 means pipeline will be run No Limit`)
pipelineMaxParallel = 10000
// run pipeline with independent goroutine
go RunPipelineInQueue(pipelineMaxParallel)
// CreatePipeline and return the model
func CreatePipeline(newPipeline *models.NewPipeline) (*models.Pipeline, errors.Error) {
pipeline, err := CreateDbPipeline(newPipeline)
if err != nil {
return nil, errors.Convert(err)
return pipeline, nil
// GetPipelines by query
func GetPipelines(query *PipelineQuery) ([]*models.Pipeline, int64, errors.Error) {
pipelines, i, err := GetDbPipelines(query)
if err != nil {
return nil, 0, errors.Convert(err)
for _, p := range pipelines {
err = fillPipelineDetail(p)
if err != nil {
return nil, 0, err
return pipelines, i, nil
// GetPipeline by id
func GetPipeline(pipelineId uint64) (*models.Pipeline, errors.Error) {
dbPipeline, err := GetDbPipeline(pipelineId)
if err != nil {
return nil, err
err = fillPipelineDetail(dbPipeline)
if err != nil {
return nil, err
return dbPipeline, nil
// GetPipelineLogsArchivePath creates an archive for the logs of this pipeline and returns its file path
func GetPipelineLogsArchivePath(pipeline *models.Pipeline) (string, errors.Error) {
logPath, err := getPipelineLogsPath(pipeline)
if err != nil {
return "", err
archive := fmt.Sprintf("%s/%s/logging.tar.gz", os.TempDir(), uuid.New())
if err = utils.CreateGZipArchive(archive, fmt.Sprintf("%s/*", logPath)); err != nil {
return "", err
return archive, err
// RunPipelineInQueue query pipeline from db and run it in a queue
func RunPipelineInQueue(pipelineMaxParallel int64) {
sema := semaphore.NewWeighted(pipelineMaxParallel)
runningParallelLabels := []string{}
var runningParallelLabelLock sync.Mutex
for {
globalPipelineLog.Info("acquire lock")
// start goroutine when sema lock ready and pipeline exist.
// to avoid read old pipeline, acquire lock before read exist pipeline
err := sema.Acquire(context.TODO(), 1)
if err != nil {
globalPipelineLog.Info("get lock and wait next pipeline")
dbPipeline := &models.Pipeline{}
for {
// prepare query to find an appropriate pipeline to execute
err := db.First(dbPipeline,
dal.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN}),
`left join _devlake_pipeline_labels ON
_devlake_pipeline_labels.pipeline_id = AND LIKE 'parallel/%' AND in ?`,
dal.Orderby("id ASC"),
if err == nil {
// next pipeline found
if !db.IsErrorNotFound(err) {
// log unexpected err
globalPipelineLog.Error(err, "dequeue failed")
// mark the pipeline running
err = db.UpdateColumns(&models.Pipeline{}, []dal.DalSet{
{ColumnName: "status", Value: models.TASK_RUNNING},
{ColumnName: "message", Value: ""},
{ColumnName: "began_at", Value: time.Now()},
}, dal.Where("id = ?", dbPipeline.ID))
if err != nil {
// add pipelineParallelLabels to runningParallelLabels
var pipelineParallelLabels []string
err = fillPipelineDetail(dbPipeline)
if err != nil {
for _, dbLabel := range dbPipeline.Labels {
if strings.HasPrefix(dbLabel, `parallel/`) {
pipelineParallelLabels = append(pipelineParallelLabels, dbLabel)
runningParallelLabels = append(runningParallelLabels, pipelineParallelLabels...)
go func(pipelineId uint64, parallelLabels []string) {
defer sema.Release(1)
defer func() {
runningParallelLabels = utils.SliceRemove(runningParallelLabels, parallelLabels...)
globalPipelineLog.Info("finish pipeline #%d, now runningParallelLabels is %s", pipelineId, runningParallelLabels)
globalPipelineLog.Info("run pipeline, %d, now running runningParallelLabels are %s", pipelineId, runningParallelLabels)
err = runPipeline(pipelineId)
if err != nil {
globalPipelineLog.Error(err, "failed to run pipeline %d", pipelineId)
}(dbPipeline.ID, pipelineParallelLabels)
func watchTemporalPipelines() {
ticker := time.NewTicker(3 * time.Second)
dc := converter.GetDefaultDataConverter()
go func() {
// run forever
for range ticker.C {
// load all running pipeline from database
runningDbPipelines := make([]models.Pipeline, 0)
err := db.All(&runningDbPipelines, dal.Where("status = ?", models.TASK_RUNNING))
if err != nil {
// progressDetails will be only used in this goroutine now
// So it needn't lock and unlock now
progressDetails := make(map[uint64]*models.TaskProgressDetail)
// check their status against temporal
for _, rp := range runningDbPipelines {
workflowId := getTemporalWorkflowId(rp.ID)
desc, err := temporalClient.DescribeWorkflowExecution(
if err != nil {
globalPipelineLog.Error(err, "failed to query workflow execution: %v", err)
// workflow is terminated by outsider
s := desc.WorkflowExecutionInfo.Status
rp.Status = models.TASK_COMPLETED
rp.Status = models.TASK_FAILED
// get error message
hisIter := temporalClient.GetWorkflowHistory(
for hisIter.HasNext() {
his, err := hisIter.Next()
if err != nil {
globalPipelineLog.Error(err, "failed to get next from workflow history iterator: %v", err)
rp.Message = fmt.Sprintf("temporal event type: %v", his.GetEventType())
rp.FinishedAt = desc.WorkflowExecutionInfo.CloseTime
err = db.UpdateColumns(rp, []dal.DalSet{
{ColumnName: "status", Value: rp.Status},
{ColumnName: "message", Value: rp.Message},
{ColumnName: "finished_at", Value: rp.FinishedAt},
if err != nil {
globalPipelineLog.Error(err, "failed to update db: %v", err)
// check pending activity
for _, activity := range desc.PendingActivities {
taskId, err := getTaskIdFromActivityId(activity.ActivityId)
if err != nil {
globalPipelineLog.Error(err, "unable to extract task id from activity id `%s`", activity.ActivityId)
progressDetail := &models.TaskProgressDetail{}
progressDetails[taskId] = progressDetail
heartbeats := activity.GetHeartbeatDetails()
if heartbeats == nil {
payloads := heartbeats.GetPayloads()
if len(payloads) == 0 {
lastPayload := payloads[len(payloads)-1]
if err := dc.FromPayload(lastPayload, progressDetail); err != nil {
globalPipelineLog.Error(err, "failed to unmarshal heartbeat payload: %v", err)
func getTemporalWorkflowId(pipelineId uint64) string {
return fmt.Sprintf("pipeline #%d", pipelineId)
// NotifyExternal FIXME ...
func NotifyExternal(pipelineId uint64) errors.Error {
if notificationService == nil {
return nil
// send notification to an external web endpoint
pipeline, err := GetPipeline(pipelineId)
if err != nil {
return err
err = notificationService.PipelineStatusChanged(PipelineNotification{
PipelineID: pipeline.ID,
CreatedAt: pipeline.CreatedAt,
UpdatedAt: pipeline.UpdatedAt,
BeganAt: pipeline.BeganAt,
FinishedAt: pipeline.FinishedAt,
Status: pipeline.Status,
if err != nil {
globalPipelineLog.Error(err, "failed to send notification: %v", err)
return err
return nil
// CancelPipeline FIXME ...
func CancelPipeline(pipelineId uint64) errors.Error {
// prevent RunPipelineInQueue from consuming pending pipelines
defer cronLocker.Unlock()
pipeline := &models.Pipeline{}
err := db.First(pipeline, dal.Where("id = ?", pipelineId))
if err != nil {
return errors.BadInput.New("pipeline not found")
if pipeline.Status == models.TASK_CREATED || pipeline.Status == models.TASK_RERUN {
pipeline.Status = models.TASK_CANCELLED
err = db.Update(pipeline)
if err != nil {
return errors.Default.Wrap(err, "faile to update pipeline")
// now, with RunPipelineInQueue being block and target pipeline got updated
// we should update the related tasks as well
err = db.UpdateColumn(
"status", models.TASK_CANCELLED,
dal.Where("pipeline_id = ?", pipelineId),
if err != nil {
return errors.Default.Wrap(err, "faile to update pipeline tasks")
// the target pipeline is pending, no running, no need to perform the actual cancel operation
return nil
if temporalClient != nil {
return errors.Convert(temporalClient.CancelWorkflow(context.Background(), getTemporalWorkflowId(pipelineId), ""))
pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId, Pending: 1, Pagination: Pagination{PageSize: -1}})
if err != nil {
return errors.Convert(err)
if count == 0 {
return nil
for _, pendingTask := range pendingTasks {
_ = CancelTask(pendingTask.ID)
return errors.Convert(err)
// getPipelineLogsPath gets the logs directory of this pipeline
func getPipelineLogsPath(pipeline *models.Pipeline) (string, errors.Error) {
pipelineLog := GetPipelineLogger(pipeline)
path := filepath.Dir(pipelineLog.GetConfig().Path)
_, err := os.Stat(path)
if err == nil {
return path, nil
if os.IsNotExist(err) {
return "", errors.NotFound.Wrap(err, fmt.Sprintf("logs for pipeline #%d not found", pipeline.ID))
return "", errors.Default.Wrap(err, fmt.Sprintf("error validating logs path for pipeline #%d", pipeline.ID))
// RerunPipeline would rerun all failed tasks or specified task
func RerunPipeline(pipelineId uint64, task *models.Task) ([]*models.Task, errors.Error) {
// prevent pipeline executor from doing anything that might jeopardize the integrity
defer cronLocker.Unlock()
// load the pipeline
pipeline, err := GetPipeline(pipelineId)
if err != nil {
return nil, err
// verify the status
if pipeline.Status == models.TASK_RUNNING {
return nil, errors.BadInput.New("pipeline is running")
if pipeline.Status == models.TASK_CREATED || pipeline.Status == models.TASK_RERUN {
return nil, errors.BadInput.New("pipeline is waiting to run")
// determine which tasks to rerun
var failedTasks []*models.Task
if task != nil {
if task.PipelineId != pipelineId {
return nil, errors.BadInput.New("the task ID and pipeline ID doesn't match")
failedTasks = append(failedTasks, task)
} else {
tasks, err := GetTasksWithLastStatus(pipelineId)
if err != nil {
return nil, errors.Default.Wrap(err, "error getting tasks")
for _, t := range tasks {
if t.Status != models.TASK_COMPLETED {
failedTasks = append(failedTasks, t)
// no tasks to rerun
if len(failedTasks) == 0 {
return nil, errors.BadInput.New("no tasks to be re-ran")
// create new tasks
// TODO: this is better to be wrapped inside a transaction
rerunTasks := []*models.Task{}
for _, t := range failedTasks {
// mark previous task failed
t.Status = models.TASK_FAILED
err := db.UpdateColumn(t, "status", models.TASK_FAILED)
if err != nil {
return nil, err
// create new task
subtasks, err := t.GetSubTasks()
if err != nil {
return nil, err
options, err := t.GetOptions()
if err != nil {
return nil, err
rerunTask, err := CreateTask(&models.NewTask{
PipelineTask: &plugin.PipelineTask{
Plugin: t.Plugin,
Subtasks: subtasks,
Options: options,
PipelineId: t.PipelineId,
PipelineRow: t.PipelineRow,
PipelineCol: t.PipelineCol,
IsRerun: true,
if err != nil {
return nil, err
// append to result
rerunTasks = append(rerunTasks, rerunTask)
// mark pipline rerun
err = db.UpdateColumn(&models.Pipeline{},
"status", models.TASK_RERUN,
dal.Where("id = ?", pipelineId),
if err != nil {
return nil, err
return rerunTasks, nil