[Issue 527] fix send goroutine blocked (#530)

* fix failTimeoutMessages() poll wrong item from block queue

* add leave out lock

Co-authored-by: boojiang <boojiang@tencent.com>
Co-authored-by: xiaolongran <xiaolongran@tencent.com>
diff --git a/pulsar/internal/blocking_queue.go b/pulsar/internal/blocking_queue.go
index 8162301..b44ec16 100644
--- a/pulsar/internal/blocking_queue.go
+++ b/pulsar/internal/blocking_queue.go
@@ -32,6 +32,9 @@
 	// Poll dequeue one item, return nil if queue is empty
 	Poll() interface{}
 
+	// CompareAndPoll compare the first item and poll it if meet the conditions
+	CompareAndPoll(compare func(item interface{}) bool) interface{}
+
 	// Peek return the first item without dequeing, return nil if queue is empty
 	Peek() interface{}
 
@@ -117,6 +120,20 @@
 	return bq.dequeue()
 }
 
+func (bq *blockingQueue) CompareAndPoll(compare func(interface{}) bool) interface{} {
+	bq.mutex.Lock()
+	defer bq.mutex.Unlock()
+
+	if bq.size == 0 {
+		return nil
+	}
+
+	if compare(bq.items[bq.headIdx]) {
+		return bq.dequeue()
+	}
+	return nil
+}
+
 func (bq *blockingQueue) Peek() interface{} {
 	bq.mutex.Lock()
 	defer bq.mutex.Unlock()
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 002d261..8b3d33d 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -515,21 +515,37 @@
 
 		// iterate at most viewSize items
 		for i := 0; i < viewSize; i++ {
-			item := p.pendingQueue.Poll()
+			tickerNeedWaiting := time.Duration(0)
+			item := p.pendingQueue.CompareAndPoll(
+				func(m interface{}) bool {
+					if m == nil {
+						return false
+					}
+
+					pi := m.(*pendingItem)
+					pi.Lock()
+					defer pi.Unlock()
+					if nextWaiting := diff(pi.sentAt); nextWaiting > 0 {
+						// current and subsequent items not timeout yet, stop iterating
+						tickerNeedWaiting = nextWaiting
+						return false
+					}
+					return true
+				})
+
 			if item == nil {
 				t.Reset(p.options.SendTimeout)
 				break
 			}
 
-			pi := item.(*pendingItem)
-			pi.Lock()
-			if nextWaiting := diff(pi.sentAt); nextWaiting > 0 {
-				// current and subsequent items not timeout yet, stop iterating
-				t.Reset(nextWaiting)
-				pi.Unlock()
+			if tickerNeedWaiting > 0 {
+				t.Reset(tickerNeedWaiting)
 				break
 			}
 
+			pi := item.(*pendingItem)
+			pi.Lock()
+
 			for _, i := range pi.sendRequests {
 				sr := i.(*sendRequest)
 				if sr.msg != nil {