blob: 1d2dd4e96a2a2e84e77ddf375bb20306bd9e71a9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package queue
import (
"context"
"fmt"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/metrics"
"github.com/gogf/gf/util/gconv"
"github.com/reactivex/rxgo/v2"
)
func init() {
RegisterQueue(newInMemoryQueue())
}
type inMemoryQueue struct {
ch chan rxgo.Item
observable rxgo.Observable
workflowDAL dal.WorkflowDAL
}
func newInMemoryQueue() ObserveQueue {
var p inMemoryQueue
p.ch = make(chan rxgo.Item)
p.observable = rxgo.FromChannel(p.ch)
p.workflowDAL = dal.NewWorkflowDAL()
return &p
}
// Name returns queue's name.
func (q *inMemoryQueue) Name() string {
return constants.QueueTypeInMemory
}
func (q *inMemoryQueue) Publish(tasks []*model.WorkflowTaskInstance) error {
if len(tasks) == 0 {
return nil
}
metrics.Add(constants.MetricsTaskQueue, fmt.Sprintf("%s_%s", q.Name(), constants.MetricsQueueSize),
float64(len(tasks)))
for _, t := range tasks {
q.ch <- rxgo.Of(t)
}
return nil
}
func (q *inMemoryQueue) Ack(tasks *model.WorkflowTaskInstance) error {
return nil
}
func (q *inMemoryQueue) Observe() {
go func() {
defer func() {
if err := recover(); err != nil {
log.Get(constants.LogQueue).Errorf("Observe error=%+v", err)
}
}()
for item := range q.observable.Observe() {
metrics.Dec(constants.MetricsTaskQueue, fmt.Sprintf("%s_%s", q.Name(), constants.MetricsQueueSize))
q.handle(item)
}
}()
}
func (q *inMemoryQueue) handle(item rxgo.Item) {
v, ok := item.V.(*model.WorkflowTaskInstance)
if !ok {
return
}
log.Get(constants.LogQueue).Infof("handle=%s", gconv.String(v))
if v.ID != 0 {
if err := q.workflowDAL.UpdateTaskInstance(dal.GetDalClient(), v); err != nil {
log.Get(constants.LogQueue).Errorf("Observe UpdateTaskInstance error=%v", err)
}
return
}
if err := q.workflowDAL.InsertTaskInstance(context.Background(), v); err != nil {
log.Get(constants.LogQueue).Errorf("Observe InsertTaskInstance error=%v", err)
}
}