blob: 270a963b3923284bf33fbcb0a927a83f4f5689da [file] [log] [blame]
package task
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/apache/servicecomb-service-center/eventbase/service/task"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
"github.com/apache/servicecomb-service-center/syncer/service/event"
"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
carisync "github.com/go-chassis/cari/sync"
"github.com/go-chassis/foundation/gopool"
)
const (
defaultInternal = 2 * time.Second
heartbeatInternal = 15 * time.Second
taskTTL = 30
taskName = "load--handle-task"
)
func Work() {
work()
}
func work() {
dl := DistributedLock{
key: taskName,
heartbeatDuration: heartbeatInternal,
ttl: taskTTL,
do: func(ctx context.Context) {
m := NewManager()
m.LoadAndHandleTask(ctx)
m.UpdateResultTask(ctx)
},
}
dl.LockDo()
}
// Manager defines task manager, transfer task to event, and send event to event manager
type Manager interface {
LoadAndHandleTask(ctx context.Context)
UpdateResultTask(ctx context.Context)
}
type ManagerOption func(*managerOptions)
type managerOptions struct {
internal time.Duration
operator Operator
eventSender event.Sender
}
func toManagerOptions(os ...ManagerOption) *managerOptions {
mo := new(managerOptions)
mo.internal = defaultInternal
mo.eventSender = event.GetManager()
for _, o := range os {
o(mo)
}
return mo
}
func ManagerInternal(i time.Duration) ManagerOption {
return func(mo *managerOptions) {
mo.internal = i
}
}
func EventSender(e event.Sender) ManagerOption {
return func(options *managerOptions) {
options.eventSender = e
}
}
func ManagerOperator(l Operator) ManagerOption {
return func(mo *managerOptions) {
mo.operator = l
}
}
func NewManager(os ...ManagerOption) Manager {
m := &manager{
toHandleTasks: make([]*carisync.Task, 0, 10),
result: make(chan *event.Result, 1000),
}
mo := toManagerOptions(os...)
if mo.operator == nil {
mo.operator = m
}
m.internal = mo.internal
m.operator = mo.operator
m.eventSender = mo.eventSender
return m
}
type manager struct {
internal time.Duration
ticker *time.Ticker
toHandleTasks []*carisync.Task
isClosing bool
result chan *event.Result
cache sync.Map
operator Operator
eventSender event.Sender
}
// Operator define task operator, to list tasks and delete task
type Operator interface {
ListTasks(ctx context.Context) ([]*carisync.Task, error)
DeleteTask(ctx context.Context, t *carisync.Task) error
}
func (m *manager) LoadAndHandleTask(ctx context.Context) {
gopool.Go(func(goctx context.Context) {
m.ticker = time.NewTicker(m.internal)
for {
select {
case _, ok := <-m.ticker.C:
if !ok {
log.Info("ticker is closed")
return
}
ts, err := m.operator.ListTasks(ctx)
if err != nil {
log.Error("load task failed", err)
continue
}
m.handleTasks(ts)
case <-goctx.Done():
m.Close()
return
case <-ctx.Done():
m.Close()
return
}
}
})
}
func (m *manager) Close() {
m.ticker.Stop()
}
func (m *manager) ListTasks(ctx context.Context) ([]*carisync.Task, error) {
tasks, err := ListTask(ctx)
if err != nil {
return nil, err
}
noHandleTasks := make([]*carisync.Task, 0, len(tasks))
skipTaskIDs := make([]string, 0, len(tasks))
for _, t := range tasks {
_, ok := m.cache.Load(t.ID)
if ok {
skipTaskIDs = append(skipTaskIDs, t.ID)
continue
}
m.cache.Store(t.ID, t)
noHandleTasks = append(noHandleTasks, t)
}
log.Info(fmt.Sprintf("load task raw count %d, to handle count %d, skip ids %v",
len(tasks), len(noHandleTasks), skipTaskIDs))
return noHandleTasks, nil
}
func (m *manager) DeleteTask(ctx context.Context, t *carisync.Task) error {
return task.Delete(ctx, t)
}
func (m *manager) UpdateResultTask(ctx context.Context) {
gopool.Go(func(goctx context.Context) {
log.Info("start updateTasks task")
for {
select {
case res := <-m.result:
if m.isClosing {
m.closeUpdateTasks()
}
m.handleResult(res)
case <-ctx.Done():
m.isClosing = true
case <-goctx.Done():
log.Info("updateTasks exit")
return
}
}
})
}
func (m *manager) closeUpdateTasks() {
c := 0
m.cache.Range(func(_, _ interface{}) bool {
c++
return true
})
if c != 0 {
return
}
close(m.result)
}
func (m *manager) handleResult(res *event.Result) {
if res.Error != nil || res.Data.Code == resource.Fail {
log.Error("get event result, return error ", res.Error)
m.cache.Range(func(key, value interface{}) bool {
if res.ID == key {
m.cache.Delete(key)
return false
}
return true
})
return
}
if res.Data == nil {
log.Info("result data is empty")
return
}
log.Info(fmt.Sprintf("key,result: %s", res.ID))
t, ok := m.cache.LoadAndDelete(res.ID)
if !ok {
return
}
code := res.Data.Code
if code != resource.Fail {
tk := t.(*carisync.Task)
err := m.operator.DeleteTask(context.TODO(), tk)
if err != nil {
log.Error("delete task failed", err)
}
}
}
func (m *manager) handleTasks(sts syncTasks) {
sort.Sort(sts)
for _, st := range sts {
m.eventSender.Send(toEvent(st, m.result))
}
}
func toEvent(task *carisync.Task, result chan<- *event.Result) *event.Event {
ops := task.Opts
if len(ops) == 0 {
ops = make(map[string]string, 2)
}
ops[string(util.CtxDomain)] = task.Domain
ops[string(util.CtxProject)] = task.Project
return &event.Event{
Event: &v1sync.Event{
Id: task.ID,
Action: task.Action,
Subject: task.ResourceType,
Opts: ops,
Value: task.Resource,
Timestamp: task.Timestamp,
},
Result: result,
}
}