blob: 6067f49e80ec1ae65d0d4efcde69818e48905a44 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dal
import (
"context"
"errors"
"fmt"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/util"
"time"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/third_party/swf"
"github.com/gogf/gf/util/gconv"
"github.com/google/uuid"
pmodel "github.com/serverlessworkflow/sdk-go/v2/model"
"gorm.io/gorm"
)
const maxSize = 100
type WorkflowDAL interface {
Select(ctx context.Context, tx *gorm.DB, workflowID string) (*model.Workflow, error)
SelectList(ctx context.Context, param *model.QueryParam) ([]model.Workflow, int, error)
Save(ctx context.Context, record *model.Workflow) error
Delete(ctx context.Context, workflowID string) error
SelectInstances(ctx context.Context, param *model.QueryParam) ([]model.WorkflowInstance, int, error)
SelectStartTask(ctx context.Context, condition model.WorkflowTask) (*model.WorkflowTask, error)
SelectTransitionTask(ctx context.Context, condition model.WorkflowTaskInstance) (*model.WorkflowTaskInstance, error)
SelectTaskInstance(ctx context.Context, condition model.WorkflowTaskInstance) (*model.WorkflowTaskInstance, error)
InsertInstance(ctx context.Context, record *model.WorkflowInstance) error
InsertTaskInstance(ctx context.Context, record *model.WorkflowTaskInstance) error
UpdateInstance(ctx context.Context, record *model.WorkflowInstance) error
UpdateTaskInstance(tx *gorm.DB, record *model.WorkflowTaskInstance) error
}
func NewWorkflowDAL() WorkflowDAL {
var w workflowDALImpl
return &w
}
type workflowDALImpl struct {
}
func (w *workflowDALImpl) Select(ctx context.Context, tx *gorm.DB, workflowID string) (*model.Workflow, error) {
var condition = model.Workflow{WorkflowID: workflowID, Status: constants.NormalStatus}
var r model.Workflow
if err := tx.WithContext(ctx).Where(&condition).First(&r).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, nil
}
return nil, err
}
return &r, nil
}
func (w *workflowDALImpl) SelectList(ctx context.Context, param *model.QueryParam) ([]model.Workflow, int, error) {
var res []model.Workflow
var condition = model.Workflow{WorkflowID: param.WorkflowID, Status: param.Status}
db := workflowDB.WithContext(ctx).Where("1=1")
if len(condition.WorkflowID) > 0 {
// Suitable for small amount of data
// when the amount of data is too large, you need to use search engines for optimization
db = db.Where("workflow_id LIKE ?", fmt.Sprintf("%%%s%%", condition.WorkflowID))
}
if condition.Status == 0 {
condition.Status = constants.NormalStatus
}
db = db.Where("status = ?", condition.Status)
if param.Size > maxSize {
param.Size = maxSize
}
if param.Page == 0 {
param.Page = 1
}
var count int64
db = db.Limit(param.Size).Offset(param.Size * (param.Page - 1)).Order("update_time DESC")
if err := db.Find(&res).Limit(-1).Offset(-1).Count(&count).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, 0, nil
}
return nil, 0, err
}
if count == 0 {
return res, int(count), nil
}
if err := w.fillInstanceCount(res); err != nil {
return res, int(count), err
}
return res, int(count), nil
}
func (w *workflowDALImpl) SelectInstances(ctx context.Context, param *model.QueryParam) ([]model.WorkflowInstance,
int, error) {
var r []model.WorkflowInstance
db := workflowDB.WithContext(ctx).Where("workflow_status != ?", constants.InvalidStatus).
Where("workflow_id = ?", param.WorkflowID)
if param.Size > maxSize {
param.Size = maxSize
}
if param.Page == 0 {
param.Page = 1
}
var count int64
db = db.Limit(param.Size).Offset(param.Size * (param.Page - 1)).Order("update_time DESC")
if err := db.Find(&r).Limit(-1).Offset(-1).Count(&count).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, 0, nil
}
return nil, 0, err
}
return r, int(count), nil
}
func (w *workflowDALImpl) SelectStartTask(ctx context.Context, condition model.WorkflowTask) (*model.WorkflowTask,
error) {
var c = model.WorkflowTaskRelation{FromTaskID: constants.TaskStartID, WorkflowID: condition.WorkflowID,
Status: constants.NormalStatus}
var r model.WorkflowTaskRelation
if err := workflowDB.WithContext(ctx).Where(&c).First(&r).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, nil
}
return nil, err
}
return &model.WorkflowTask{TaskID: r.ToTaskID, WorkflowID: condition.WorkflowID}, nil
}
func (w *workflowDALImpl) SelectTransitionTask(ctx context.Context, condition model.WorkflowTaskInstance) (
*model.WorkflowTaskInstance, error) {
var r model.WorkflowTaskInstance
if err := workflowDB.WithContext(ctx).Where(&condition).First(&r).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, nil
}
return nil, err
}
return &r, nil
}
func (w *workflowDALImpl) SelectTaskInstance(ctx context.Context, condition model.WorkflowTaskInstance) (*model.
WorkflowTaskInstance, error) {
var r model.WorkflowTaskInstance
if err := workflowDB.WithContext(ctx).Where(&condition).Order("create_time desc").
First(&r).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, nil
}
return nil, err
}
var err error
var tasks []*model.WorkflowTask
var childTasks []*model.WorkflowTaskRelation
var taskActions []*model.WorkflowTaskAction
var handlers []func() error
handlers = append(handlers, func() error {
tasks, err = w.selectTask(context.Background(), r.WorkflowID, []string{r.TaskID})
return err
})
handlers = append(handlers, func() error {
childTasks, err = w.selectTaskRelation(context.Background(), r.WorkflowID, r.TaskID)
return err
})
handlers = append(handlers, func() error {
taskActions, err = w.selectTaskAction(context.Background(), r.WorkflowID, []string{r.TaskID})
if err != nil {
return err
}
return nil
})
if err = util.GoAndWait(handlers...); err != nil {
return nil, err
}
return w.completeTaskInstance(r, tasks, childTasks, taskActions)
}
// Delete delete workflow and relate info
func (w *workflowDALImpl) Delete(ctx context.Context, workflowID string) error {
return w.delete(workflowDB, workflowID)
}
func (w *workflowDALImpl) Save(ctx context.Context, record *model.Workflow) error {
return workflowDB.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// first delete and insert
if len(record.WorkflowID) > 0 {
if err := w.delete(tx, record.WorkflowID); err != nil {
return err
}
}
return w.create(ctx, tx, record)
})
}
func (w *workflowDALImpl) InsertInstance(ctx context.Context, record *model.WorkflowInstance) error {
record.CreateTime = time.Now()
record.UpdateTime = time.Now()
return workflowDB.WithContext(ctx).Create(&record).Error
}
func (w *workflowDALImpl) InsertTaskInstance(ctx context.Context,
record *model.WorkflowTaskInstance) error {
record.CreateTime = time.Now()
record.UpdateTime = time.Now()
return workflowDB.WithContext(ctx).Create(&record).Error
}
func (w *workflowDALImpl) UpdateInstance(ctx context.Context, record *model.WorkflowInstance) error {
var condition = model.WorkflowInstance{WorkflowInstanceID: record.WorkflowInstanceID}
record.UpdateTime = time.Now()
return workflowDB.WithContext(ctx).Where(&condition).Updates(&record).Error
}
func (w *workflowDALImpl) UpdateTaskInstance(tx *gorm.DB, record *model.WorkflowTaskInstance) error {
var condition = model.WorkflowTaskInstance{ID: record.ID}
record.UpdateTime = time.Now()
return tx.Where(&condition).Updates(&record).Error
}
func (w *workflowDALImpl) delete(tx *gorm.DB, workflowID string) error {
var handlers []func() error
handlers = append(handlers, func() error {
record := model.Workflow{Status: constants.InvalidStatus, UpdateTime: time.Now()}
return tx.Where("workflow_id = ?", workflowID).Updates(&record).Error
}, func() error {
record := model.WorkflowTask{Status: constants.InvalidStatus, UpdateTime: time.Now()}
return tx.Where("workflow_id = ?", workflowID).Updates(&record).Error
}, func() error {
record := model.WorkflowTaskRelation{Status: constants.InvalidStatus,
UpdateTime: time.Now()}
return tx.Where("workflow_id = ?", workflowID).Updates(&record).Error
}, func() error {
record := model.WorkflowTaskAction{Status: constants.InvalidStatus,
UpdateTime: time.Now()}
return tx.Where("workflow_id = ?", workflowID).Updates(&record).Error
}, func() error {
record := model.WorkflowInstance{WorkflowStatus: constants.InvalidStatus,
UpdateTime: time.Now()}
return tx.Where("workflow_id = ?", workflowID).Updates(&record).Error
}, func() error {
record := model.WorkflowTaskInstance{Status: constants.InvalidStatus,
UpdateTime: time.Now()}
return tx.Where("workflow_id = ?", workflowID).Updates(&record).Error
})
return util.GoAndWait(handlers...)
}
func (w *workflowDALImpl) create(ctx context.Context, tx *gorm.DB, record *model.Workflow) error {
wf, err := swf.Parse(record.Definition)
if err != nil {
return err
}
if wf == nil {
return errors.New("workflow text invalid")
}
r, err := w.Select(ctx, tx, wf.ID)
if err != nil {
return err
}
if r != nil {
return errors.New("workflow id already exists")
}
var insertData = model.Workflow{}
insertData.WorkflowID = wf.ID
insertData.WorkflowName = wf.Name
insertData.Version = wf.Version
insertData.Definition = record.Definition
insertData.Status = constants.NormalStatus
insertData.CreateTime = time.Now()
insertData.UpdateTime = time.Now()
var handlers []func() error
handlers = append(handlers, func() error {
return tx.Create(insertData).Error
})
tasks := w.buildTask(wf)
for _, task := range tasks {
task := task
handlers = append(handlers, func() error {
return tx.Create(task).Error
})
for _, action := range task.Actions {
action := action
handlers = append(handlers, func() error {
return tx.Create(action).Error
})
}
}
taskRelations := w.buildTaskRelation(wf, tasks)
for _, relation := range taskRelations {
relation := relation
handlers = append(handlers, func() error {
return tx.Create(relation).Error
})
}
return util.GoAndWait(handlers...)
}
func (w *workflowDALImpl) buildTask(workflow *pmodel.Workflow) []*model.WorkflowTask {
if workflow == nil || len(workflow.States) == 0 {
return nil
}
var tasks []*model.WorkflowTask
for _, state := range workflow.States {
var task = model.WorkflowTask{}
task.WorkflowID = workflow.ID
task.TaskID = uuid.New().String()
task.TaskName = state.GetName()
task.Status = constants.NormalStatus
task.TaskType = gconv.String(state.GetType())
task.CreateTime = time.Now()
task.UpdateTime = time.Now()
task.Actions = w.buildTaskAction(task.TaskID, workflow, state)
w.fillTaskFilterIfExist(state, &task)
tasks = append(tasks, &task)
}
return tasks
}
func (w *workflowDALImpl) fillTaskFilterIfExist(workflowState pmodel.State, task *model.WorkflowTask) {
filter := workflowState.GetStateDataFilter()
if filter != nil {
task.TaskInputFilter = filter.Input
}
}
func (w *workflowDALImpl) buildTaskAction(taskID string, workflow *pmodel.Workflow,
state pmodel.State) []*model.WorkflowTaskAction {
var functions = make(map[string]*pmodel.Function)
for i, function := range workflow.Functions {
functions[function.Name] = &workflow.Functions[i]
}
switch state.GetType() {
case pmodel.StateTypeOperation:
return w.doBuildOperationTaskAction(workflow.ID, taskID, functions, state)
case pmodel.StateTypeEvent:
return w.doBuildEventTaskAction(workflow.ID, taskID, functions, state)
}
return nil
}
func (w *workflowDALImpl) buildTaskRelation(workflow *pmodel.Workflow,
tasks []*model.WorkflowTask) []*model.WorkflowTaskRelation {
if workflow == nil || len(workflow.States) == 0 {
return nil
}
var taskIDs = make(map[string]string)
for _, task := range tasks {
taskIDs[task.TaskName] = task.TaskID
}
var taskRelations []*model.WorkflowTaskRelation
for _, state := range workflow.States {
if workflow.Start.StateName == state.GetName() {
taskRelations = append(taskRelations, w.doBuildStartTaskRelation(workflow, state, taskIDs))
}
switch state.GetType() {
case pmodel.StateTypeOperation:
fallthrough
case pmodel.StateTypeEvent:
taskRelations = append(taskRelations, w.doBuildTaskRelation(workflow, state, taskIDs))
case pmodel.StateTypeSwitch:
taskRelations = append(taskRelations, w.doBuildSwitchTaskRelation(workflow, state, taskIDs)...)
default:
log.Errorf("buildTaskRelation=not support type=%s", state.GetType())
}
}
return taskRelations
}
func (w *workflowDALImpl) doBuildOperationTaskAction(workflowID string, taskID string,
functions map[string]*pmodel.Function, state pmodel.State) []*model.WorkflowTaskAction {
s, ok := state.(*pmodel.OperationState)
if !ok {
return nil
}
var actions []*model.WorkflowTaskAction
for _, action := range s.Actions {
var taskAction model.WorkflowTaskAction
taskAction.WorkflowID = workflowID
taskAction.TaskID = taskID
function := functions[action.FunctionRef.RefName]
if function == nil {
continue
}
taskAction.OperationName = gconv.String(function.Operation)
taskAction.OperationType = gconv.String(function.Type)
taskAction.Status = constants.NormalStatus
taskAction.CreateTime = time.Now()
taskAction.UpdateTime = time.Now()
actions = append(actions, &taskAction)
}
return actions
}
func (w *workflowDALImpl) doBuildEventTaskAction(workflowID string, taskID string,
functions map[string]*pmodel.Function, state pmodel.State) []*model.WorkflowTaskAction {
s, ok := state.(*pmodel.EventState)
if !ok {
return nil
}
var actions []*model.WorkflowTaskAction
for _, event := range s.OnEvents {
for _, action := range event.Actions {
var taskAction model.WorkflowTaskAction
taskAction.WorkflowID = workflowID
taskAction.TaskID = taskID
function := functions[action.FunctionRef.RefName]
if function == nil {
continue
}
taskAction.OperationName = gconv.String(function.Operation)
taskAction.OperationType = gconv.String(function.Type)
taskAction.Status = constants.NormalStatus
taskAction.CreateTime = time.Now()
taskAction.UpdateTime = time.Now()
actions = append(actions, &taskAction)
}
}
return actions
}
func (w *workflowDALImpl) doBuildTaskRelation(workflow *pmodel.Workflow, state pmodel.State,
taskIDs map[string]string) *model.WorkflowTaskRelation {
var r = model.WorkflowTaskRelation{}
r.WorkflowID = workflow.ID
r.FromTaskID = taskIDs[state.GetName()]
if state.GetTransition() == nil && !state.GetEnd().Terminate {
r.ToTaskID = constants.TaskEndID
} else {
r.ToTaskID = taskIDs[state.GetTransition().NextState]
}
r.Status = constants.NormalStatus
r.CreateTime = time.Now()
r.UpdateTime = time.Now()
return &r
}
func (w *workflowDALImpl) doBuildSwitchTaskRelation(workflow *pmodel.Workflow, state pmodel.State,
taskIDs map[string]string) []*model.WorkflowTaskRelation {
s, ok := state.(*pmodel.DataBasedSwitchState)
if !ok {
return nil
}
var rel []*model.WorkflowTaskRelation
if !s.DefaultCondition.End.Terminate {
var r = model.WorkflowTaskRelation{}
r.WorkflowID = workflow.ID
r.FromTaskID = taskIDs[state.GetName()]
r.ToTaskID = constants.TaskEndID
r.Status = constants.NormalStatus
r.CreateTime = time.Now()
r.UpdateTime = time.Now()
rel = append(rel, &r)
}
for _, condition := range s.DataConditions {
var r = model.WorkflowTaskRelation{}
r.WorkflowID = workflow.ID
r.FromTaskID = taskIDs[state.GetName()]
r.Status = constants.NormalStatus
r.CreateTime = time.Now()
r.UpdateTime = time.Now()
if c, ok := condition.(*pmodel.TransitionDataCondition); ok {
r.ToTaskID = taskIDs[c.Transition.NextState]
r.Condition = c.Condition
}
if c, ok := condition.(*pmodel.EndDataCondition); ok {
r.ToTaskID = constants.TaskEndID
r.Condition = c.Condition
}
rel = append(rel, &r)
}
return rel
}
func (w *workflowDALImpl) doBuildStartTaskRelation(workflow *pmodel.Workflow, state pmodel.State,
taskIDs map[string]string) *model.WorkflowTaskRelation {
var r = model.WorkflowTaskRelation{}
r.WorkflowID = workflow.ID
r.FromTaskID = constants.TaskStartID
r.ToTaskID = taskIDs[state.GetName()]
r.Status = constants.NormalStatus
r.CreateTime = time.Now()
r.UpdateTime = time.Now()
return &r
}
func (w *workflowDALImpl) selectTask(ctx context.Context, workflowID string, taskIDs []string) ([]*model.WorkflowTask,
error) {
if len(taskIDs) == 0 {
return nil, nil
}
var condition = model.WorkflowTask{WorkflowID: workflowID, TaskIDs: taskIDs}
var r []*model.WorkflowTask
if err := workflowDB.WithContext(ctx).Where(&condition).Where("task_id = ?", taskIDs).
Find(&r).Error; err != nil {
return nil, err
}
return r, nil
}
func (w *workflowDALImpl) selectTaskAction(ctx context.Context,
workflowID string, taskIDs []string) ([]*model.WorkflowTaskAction, error) {
if len(taskIDs) == 0 {
return nil, nil
}
var condition = model.WorkflowTaskAction{WorkflowID: workflowID, TaskIDs: taskIDs}
var r []*model.WorkflowTaskAction
if err := workflowDB.WithContext(ctx).Where(&condition).Where("task_id = ?", taskIDs).
Find(&r).Error; err != nil {
return nil, err
}
return r, nil
}
func (w *workflowDALImpl) selectTaskRelation(ctx context.Context, workflowID string, taskID string) (
[]*model.WorkflowTaskRelation, error) {
var relations []*model.WorkflowTaskRelation
var c = model.WorkflowTaskRelation{FromTaskID: taskID, WorkflowID: workflowID, Status: constants.NormalStatus}
if err := workflowDB.WithContext(ctx).Where(&c).Find(&relations).Error; err != nil {
return nil, err
}
return relations, nil
}
func (w *workflowDALImpl) completeTaskInstance(instance model.WorkflowTaskInstance, tasks []*model.WorkflowTask,
childTasks []*model.WorkflowTaskRelation, taskActions []*model.WorkflowTaskAction) (*model.WorkflowTaskInstance, error) {
if len(tasks) == 0 {
return nil, nil
}
var r model.WorkflowTaskInstance
if err := gconv.Struct(instance, &r); err != nil {
return nil, err
}
r.Task = tasks[0]
r.Task.ChildTasks = childTasks
r.Task.Actions = taskActions
return &r, nil
}
func (w *workflowDALImpl) fillInstanceCount(workflows []model.Workflow) error {
var handlers []func() error
for idx := range workflows {
idx := idx
handlers = append(handlers, func() error {
var instances []model.WorkflowInstance
if err := workflowDB.Where("workflow_id = ?", workflows[idx].WorkflowID).
Where("workflow_status != ?", constants.InvalidStatus).Find(&instances).Error; err != nil {
return err
}
if len(instances) == 0 {
return nil
}
workflows[idx].TotalInstances = len(instances)
for _, instance := range instances {
if instance.WorkflowStatus == constants.TaskInstanceFailStatus {
workflows[idx].TotalFailedInstances++
} else {
workflows[idx].TotalRunningInstances++
}
}
return nil
})
}
return util.GoAndWait(handlers...)
}