blob: ce22670d29cc8944c68d663ed1508fbe95564b13 [file] [log] [blame]
//Copyright 2017 Huawei Technologies Co., Ltd
//
//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
package store
import (
"errors"
"github.com/ServiceComb/service-center/util"
"golang.org/x/net/context"
"sync"
"time"
)
const (
DEFAULT_MAX_TASK_COUNT = 1000
DEFAULT_REMOVE_TASKS_INTERVAL = 30 * time.Second
)
type AsyncTask interface {
Key() string
Do(ctx context.Context) error
Err() error
}
type AsyncTasker interface {
AddTask(ctx context.Context, task AsyncTask) error
RemoveTask(key string) error
LatestHandled(key string) (AsyncTask, error)
Run()
Stop()
Ready() <-chan struct{}
}
type BaseAsyncTasker struct {
queues map[string]*util.UniQueue
latestTasks map[string]AsyncTask
removeTasks map[string]struct{}
goroutine *util.GoRoutine
queueLock sync.RWMutex
ready chan struct{}
isClose bool
}
func (lat *BaseAsyncTasker) AddTask(ctx context.Context, task AsyncTask) error {
if task == nil || ctx == nil {
return errors.New("invalid parameters")
}
lat.queueLock.RLock()
queue, ok := lat.queues[task.Key()]
latestTask := lat.latestTasks[task.Key()]
lat.queueLock.RUnlock()
if !ok {
lat.queueLock.Lock()
queue, ok = lat.queues[task.Key()]
if !ok {
queue = util.NewUniQueue()
lat.queues[task.Key()] = queue
latestTask = task
lat.latestTasks[task.Key()] = latestTask
}
lat.queueLock.Unlock()
}
if !ok || lat.isClose {
// do immediately at first time
return task.Do(ctx)
}
err := queue.Put(ctx, task)
if err != nil {
return err
}
util.LOGGER.Debugf("task done! key is %s", task.Key())
handled, err := lat.LatestHandled(task.Key())
if err != nil {
return err
}
return handled.Err()
}
func (lat *BaseAsyncTasker) RemoveTask(key string) error {
lat.queueLock.Lock()
if lat.isClose {
lat.queueLock.Unlock()
return errors.New("AsyncTasker is stopped")
}
lat.removeTasks[key] = struct{}{}
lat.queueLock.Unlock()
return nil
}
func (lat *BaseAsyncTasker) removeTask(key string) {
lat.queueLock.Lock()
delete(lat.queues, key)
delete(lat.latestTasks, key)
delete(lat.removeTasks, key)
lat.queueLock.Unlock()
util.LOGGER.Debugf("remove task, key is %s", key)
}
func (lat *BaseAsyncTasker) LatestHandled(key string) (AsyncTask, error) {
lat.queueLock.RLock()
at, ok := lat.latestTasks[key]
lat.queueLock.RUnlock()
if !ok {
return nil, errors.New("expired behavior")
}
return at, nil
}
func (lat *BaseAsyncTasker) schedule(stopCh <-chan struct{}) {
util.SafeCloseChan(lat.ready)
ready := make(chan AsyncTask, DEFAULT_MAX_TASK_COUNT)
defer func() {
close(ready)
util.LOGGER.Debugf("AsyncTasker is ready to stop")
}()
for {
select {
case <-stopCh:
util.LOGGER.Debugf("scheduler exited for AsyncTasker is stopped")
return
default:
go lat.collectReadyTasks(ready)
ctx, _ := context.WithTimeout(context.Background(), time.Second)
select {
case task := <-ready:
lat.scheduleTask(task.(AsyncTask))
lat.scheduleReadyTasks(ready)
case <-ctx.Done():
util.LOGGER.Debugf("timed out to collect ready tasks")
}
}
}
}
func (lat *BaseAsyncTasker) daemonRemoveTask(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
util.LOGGER.Debugf("daemon remove task exited for AsyncTasker is stopped")
return
case <-time.After(DEFAULT_REMOVE_TASKS_INTERVAL):
if lat.isClose {
return
}
removes := make([]string, 0, len(lat.removeTasks))
lat.queueLock.RLock()
for key := range lat.removeTasks {
removes = append(removes, key)
}
lat.queueLock.RUnlock()
for _, key := range removes {
lat.removeTask(key)
}
util.LOGGER.Debugf("daemon remove task is running, %d removed", len(removes))
}
}
}
func (lat *BaseAsyncTasker) Run() {
lat.queueLock.Lock()
if !lat.isClose {
lat.queueLock.Unlock()
return
}
lat.isClose = false
lat.queueLock.Unlock()
lat.goroutine.Do(lat.schedule)
lat.goroutine.Do(lat.daemonRemoveTask)
}
func (lat *BaseAsyncTasker) scheduleReadyTasks(ready <-chan AsyncTask) {
ctx, _ := context.WithTimeout(context.Background(), time.Second)
for {
select {
case task := <-ready:
lat.scheduleTask(task.(AsyncTask))
case <-ctx.Done():
return
}
}
}
func (lat *BaseAsyncTasker) collectReadyTasks(ready chan<- AsyncTask) {
lat.queueLock.RLock()
for key, queue := range lat.queues {
select {
case task, ok := <-queue.Chan():
util.LOGGER.Debugf("get task in queue[%v], key is %s", ok, key)
if !ok {
continue
}
ready <- task.(AsyncTask) // will block when a lot of tasks coming in.
default:
util.LOGGER.Debugf("no task in queue, key is %s", key)
}
}
lat.queueLock.RUnlock()
}
func (lat *BaseAsyncTasker) scheduleTask(at AsyncTask) {
util.LOGGER.Debugf("start to run task, key is %s", at.Key())
lat.goroutine.Do(func(stopCh <-chan struct{}) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()
lat.queueLock.RLock()
_, ok := lat.latestTasks[at.Key()]
if !ok {
lat.queueLock.RUnlock()
util.LOGGER.Debugf("task is removed, key is %s", at.Key())
return
}
lat.queueLock.RUnlock()
at.Do(ctx)
lat.queueLock.Lock()
lat.latestTasks[at.Key()] = at
lat.queueLock.Unlock()
}()
select {
case <-ctx.Done():
util.LOGGER.Debugf("finish to handle task, key is %s", at.Key())
case <-stopCh:
cancel()
util.LOGGER.Debugf("cancelled task for AsyncTasker is stopped, key is %s", at.Key())
}
})
}
func (lat *BaseAsyncTasker) Stop() {
lat.queueLock.Lock()
if lat.isClose {
lat.queueLock.Unlock()
return
}
lat.isClose = true
for key, queue := range lat.queues {
queue.Close()
delete(lat.queues, key)
delete(lat.latestTasks, key)
}
for key := range lat.removeTasks {
delete(lat.removeTasks, key)
}
lat.queueLock.Unlock()
lat.goroutine.Close(true)
util.SafeCloseChan(lat.ready)
util.LOGGER.Debugf("AsyncTasker is stopped")
}
func (lat *BaseAsyncTasker) Ready() <-chan struct{} {
return lat.ready
}
func NewAsyncTasker() AsyncTasker {
return &BaseAsyncTasker{
latestTasks: make(map[string]AsyncTask),
queues: make(map[string]*util.UniQueue),
removeTasks: make(map[string]struct{}),
goroutine: util.NewGo(make(chan struct{})),
ready: make(chan struct{}),
isClose: true,
}
}