[fix] task queue will block when subscriber busy
diff --git a/pkg/queue/taskqueue.go b/pkg/queue/taskqueue.go
index 1947cf7..862e734 100644
--- a/pkg/queue/taskqueue.go
+++ b/pkg/queue/taskqueue.go
@@ -19,8 +19,11 @@
import (
"context"
+ "fmt"
+ "sync"
"github.com/apache/servicecomb-service-center/pkg/goutil"
+ "github.com/apache/servicecomb-service-center/pkg/log"
"github.com/go-chassis/foundation/gopool"
)
@@ -42,8 +45,10 @@
type TaskQueue struct {
Workers []Worker
- taskCh chan Task
- goroutine *gopool.Pool
+ taskChLock sync.RWMutex
+ taskChSize int
+ taskCh chan Task
+ goroutine *gopool.Pool
}
// AddWorker is the method to add Worker
@@ -53,7 +58,24 @@
// Add is the method to add task in queue, one task will be handled by all workers
func (q *TaskQueue) Add(t Task) {
- q.taskCh <- t
+ q.taskChLock.RLock()
+ select {
+ case q.taskCh <- t:
+ q.taskChLock.RUnlock()
+ default:
+ q.taskChLock.RUnlock()
+ q.resetTaskCh()
+ }
+}
+
+func (q *TaskQueue) resetTaskCh() {
+ q.taskChLock.Lock()
+ defer q.taskChLock.Unlock()
+
+ log.Warn(fmt.Sprintf("taskCh[%d] is full, reset taskCh", q.taskChSize))
+ close(q.taskCh)
+ q.taskCh = make(chan Task, q.taskChSize)
+ q.Run()
}
func (q *TaskQueue) dispatch(ctx context.Context, w Worker, obj interface{}) {
@@ -82,7 +104,11 @@
select {
case <-ctx.Done():
return
- case task := <-q.taskCh:
+ case task, ok := <-q.taskCh:
+ if !ok {
+ log.Warn("taskCh is closed")
+ return
+ }
q.Do(ctx, task)
}
}
@@ -99,7 +125,8 @@
size = eventQueueSize
}
return &TaskQueue{
- taskCh: make(chan Task, size),
- goroutine: goutil.New(gopool.Configure()),
+ taskChSize: size,
+ taskCh: make(chan Task, size),
+ goroutine: goutil.New(gopool.Configure()),
}
}
diff --git a/pkg/queue/taskqueue_test.go b/pkg/queue/taskqueue_test.go
index 681ba64..0df7d88 100644
--- a/pkg/queue/taskqueue_test.go
+++ b/pkg/queue/taskqueue_test.go
@@ -20,6 +20,7 @@
import (
"context"
"testing"
+ "time"
)
type mockWorker struct {
@@ -32,7 +33,6 @@
func TestNewEventQueue(t *testing.T) {
h := &mockWorker{make(chan interface{}, 1)}
-
q := NewTaskQueue(0)
q.AddWorker(h)
@@ -59,3 +59,15 @@
q.Stop()
q.Add(Task{Payload: 3})
}
+
+func TestTaskQueue_Add(t *testing.T) {
+ h := &mockWorker{make(chan interface{}, 10000)}
+ q := NewTaskQueue(5)
+ q.AddWorker(h)
+ q.Run()
+ time.Sleep(100 * time.Millisecond)
+ for i := 0; i < 10000; i++ {
+ go q.Add(Task{Payload: 1})
+ }
+ q.Stop()
+}