blob: 910c11e494e50414d71f6467d1515cf9f79e9ca7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package flow
import (
"context"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/metrics"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/queue"
"github.com/google/uuid"
)
type Engine struct {
workflowDAL dal.WorkflowDAL
queue queue.ObserveQueue
}
func NewEngine() *Engine {
var engine Engine
engine.workflowDAL = dal.NewWorkflowDAL()
engine.queue = queue.GetQueue(config.GlobalConfig().Flow.Queue.Store)
return &engine
}
// Validate 验证流程执行合法性
func (e *Engine) Validate(ctx context.Context, instanceID string) error {
return nil
}
// Start start workflow
func (e *Engine) Start(ctx context.Context, param *WorkflowParam) (string, error) {
metrics.Inc(constants.MetricsEngine, constants.MetricsStartRequest)
r, err := e.workflowDAL.SelectStartTask(ctx, model.WorkflowTask{WorkflowID: param.ID})
if err != nil {
return "", err
}
if r == nil {
return "", ErrWorkflowNotExists
}
var workflowInstanceID = uuid.New().String()
if err = e.workflowDAL.InsertInstance(ctx, &model.WorkflowInstance{
WorkflowID: param.ID, WorkflowInstanceID: workflowInstanceID,
WorkflowStatus: constants.WorkflowInstanceProcessStatus,
}); err != nil {
return "", err
}
var w = model.WorkflowTaskInstance{WorkflowInstanceID: workflowInstanceID, WorkflowID: param.ID,
TaskID: r.TaskID, TaskInstanceID: uuid.New().String(), Status: constants.TaskInstanceWaitStatus,
Input: param.Input, IsStart: true}
return workflowInstanceID, e.queue.Publish([]*model.WorkflowTaskInstance{&w})
}
// Transition transition next workflow task
func (e *Engine) Transition(ctx context.Context, param *WorkflowParam) error {
metrics.Inc(constants.MetricsEngine, constants.MetricsTransitionRequest)
r, err := e.workflowDAL.SelectTransitionTask(ctx, model.WorkflowTaskInstance{WorkflowID: param.ID,
WorkflowInstanceID: param.InstanceID, TaskInstanceID: param.TaskInstanceID,
Status: constants.TaskInstanceSleepStatus})
if err != nil {
return err
}
if r == nil {
return nil
}
r.Status = constants.TaskInstanceWaitStatus
return e.queue.Publish([]*model.WorkflowTaskInstance{r})
}