blob: 0e7ede5b61ee4f999b99d8f01cf5a2f1285d9a88 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package schedule
import (
"context"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/config"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/filter"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/metrics"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/task"
"github.com/avast/retry-go/v4"
"gorm.io/gorm"
"time"
)
func init() {
schedulerBuilder["in-line"] = NewInlineScheduler
}
const (
schedulerLockKey = "scheduler_lock"
schedulerLockTimeout = 2
)
type inlineScheduler struct {
workflowDAL dal.WorkflowDAL
}
func NewInlineScheduler() Scheduler {
var s inlineScheduler
s.workflowDAL = dal.NewWorkflowDAL()
return &s
}
func (s *inlineScheduler) Run() {
go func() {
ticker := time.NewTicker(time.Millisecond * time.Duration(config.Get().Flow.Scheduler.Interval))
defer func() {
if err := recover(); err != nil {
log.Get(constants.LogSchedule).Errorf("schedule run error=%+v", err)
}
ticker.Stop()
}()
for range ticker.C {
s.handle()
}
}()
}
func (s *inlineScheduler) handle() {
if err := dal.GetDalClient().Transaction(func(tx *gorm.DB) error {
var res *model.WorkflowTaskInstance
if err := s.lock(func() error {
var err error
res, err = s.fetchTask(tx)
if err != nil {
return err
}
return nil
}); err != nil {
return err
}
if res == nil {
return nil
}
// filter input data
filter.FilterWorkflowTaskInputData(res)
t := task.New(res)
if t == nil {
return nil
}
if err := retry.Do(func() error {
return t.Run()
}, retry.Attempts(constants.RetryAttempts)); err != nil {
return s.workflowDAL.UpdateTaskInstance(tx, &model.WorkflowTaskInstance{ID: res.ID,
Status: constants.TaskInstanceFailStatus})
}
return s.workflowDAL.UpdateTaskInstance(tx, &model.WorkflowTaskInstance{ID: res.ID,
Status: constants.TaskInstanceSuccessStatus})
}); err != nil {
log.Get(constants.LogSchedule).Errorf("handle UpdateTaskInstance error=%v", err)
}
}
func (s *inlineScheduler) lock(h func() error) error {
start := time.Now()
l, err := dal.GetLockClient().ObtainTimeout(schedulerLockKey, schedulerLockTimeout)
elapsed := time.Since(start).Milliseconds()
metrics.RecordLatency(constants.MetricsScheduler, constants.MetricsDbLockAcquireTime, float64(elapsed))
if err != nil {
return err
}
defer l.Release()
return h()
}
func (s *inlineScheduler) fetchTask(tx *gorm.DB) (*model.WorkflowTaskInstance, error) {
var err error
res, err := s.workflowDAL.SelectTaskInstance(context.Background(),
model.WorkflowTaskInstance{Status: constants.TaskInstanceWaitStatus})
if err != nil {
return nil, err
}
if res == nil {
return nil, nil
}
if err = s.workflowDAL.UpdateTaskInstance(tx, &model.WorkflowTaskInstance{ID: res.ID,
Status: constants.TaskInstanceProcessStatus}); err != nil {
return nil, err
}
return res, nil
}