[Issue 527] fix send goroutine blocked (#530)
* fix failTimeoutMessages() poll wrong item from block queue
* add leave out lock
Co-authored-by: boojiang <boojiang@tencent.com>
Co-authored-by: xiaolongran <xiaolongran@tencent.com>
diff --git a/pulsar/internal/blocking_queue.go b/pulsar/internal/blocking_queue.go
index 8162301..b44ec16 100644
--- a/pulsar/internal/blocking_queue.go
+++ b/pulsar/internal/blocking_queue.go
@@ -32,6 +32,9 @@
// Poll dequeue one item, return nil if queue is empty
Poll() interface{}
+ // CompareAndPoll compare the first item and poll it if meet the conditions
+ CompareAndPoll(compare func(item interface{}) bool) interface{}
+
// Peek return the first item without dequeing, return nil if queue is empty
Peek() interface{}
@@ -117,6 +120,20 @@
return bq.dequeue()
}
+func (bq *blockingQueue) CompareAndPoll(compare func(interface{}) bool) interface{} {
+ bq.mutex.Lock()
+ defer bq.mutex.Unlock()
+
+ if bq.size == 0 {
+ return nil
+ }
+
+ if compare(bq.items[bq.headIdx]) {
+ return bq.dequeue()
+ }
+ return nil
+}
+
func (bq *blockingQueue) Peek() interface{} {
bq.mutex.Lock()
defer bq.mutex.Unlock()
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 002d261..8b3d33d 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -515,21 +515,37 @@
// iterate at most viewSize items
for i := 0; i < viewSize; i++ {
- item := p.pendingQueue.Poll()
+ tickerNeedWaiting := time.Duration(0)
+ item := p.pendingQueue.CompareAndPoll(
+ func(m interface{}) bool {
+ if m == nil {
+ return false
+ }
+
+ pi := m.(*pendingItem)
+ pi.Lock()
+ defer pi.Unlock()
+ if nextWaiting := diff(pi.sentAt); nextWaiting > 0 {
+ // current and subsequent items not timeout yet, stop iterating
+ tickerNeedWaiting = nextWaiting
+ return false
+ }
+ return true
+ })
+
if item == nil {
t.Reset(p.options.SendTimeout)
break
}
- pi := item.(*pendingItem)
- pi.Lock()
- if nextWaiting := diff(pi.sentAt); nextWaiting > 0 {
- // current and subsequent items not timeout yet, stop iterating
- t.Reset(nextWaiting)
- pi.Unlock()
+ if tickerNeedWaiting > 0 {
+ t.Reset(tickerNeedWaiting)
break
}
+ pi := item.(*pendingItem)
+ pi.Lock()
+
for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
if sr.msg != nil {