asynchronized send timeout checking (#460)

Fixes #458

### Motivation

For current producer, send timeout checking triggered by interval batch flush
If connection closed, the producer eventloop will blocked to reconnect to broker, lead to batch flush and send timeout checking take no effective, java-client timer did effective in this situation

### Modifications

Asynchronized send timeout by running in independent goroutine until producer closed, and without a pending queue lock

### Verifying this change

- [x] Make sure that the change passes the CI checks.

### Others

Without pending queue lock, the send timeout checking  gets more complicated, I don't know if it's worth it for performance.
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 2536909..41a1b9b 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -140,6 +140,9 @@
 	p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
 	p.setProducerState(producerReady)
 
+	if p.options.SendTimeout > 0 {
+		go p.failTimeoutMessages()
+	}
 	go p.runEventsLoop()
 
 	return p, nil
@@ -427,10 +430,6 @@
 }
 
 func (p *partitionProducer) internalFlushCurrentBatch() {
-	if p.options.SendTimeout > 0 {
-		p.failTimeoutMessages()
-	}
-
 	batchData, sequenceID, callbacks := p.batchBuilder.Flush()
 	if batchData == nil {
 		return
@@ -446,46 +445,91 @@
 }
 
 func (p *partitionProducer) failTimeoutMessages() {
-	// since Closing/Closed connection couldn't be reopen, load and compare is safe
-	state := p.getProducerState()
-	if state == producerClosing || state == producerClosed {
-		return
+	diff := func(sentAt time.Time) time.Duration {
+		return p.options.SendTimeout - time.Since(sentAt)
 	}
 
-	item := p.pendingQueue.Peek()
-	if item == nil {
-		// pending queue is empty
-		return
-	}
+	t := time.NewTimer(p.options.SendTimeout)
+	defer t.Stop()
 
-	pi := item.(*pendingItem)
-	if time.Since(pi.sentAt) < p.options.SendTimeout {
-		// pending messages not timeout yet
-		return
-	}
+	for range t.C {
+		state := p.getProducerState()
+		if state == producerClosing || state == producerClosed {
+			return
+		}
 
-	p.log.Infof("Failing %d messages", p.pendingQueue.Size())
-	for p.pendingQueue.Size() > 0 {
-		pi = p.pendingQueue.Poll().(*pendingItem)
-		pi.Lock()
-		for _, i := range pi.sendRequests {
-			sr := i.(*sendRequest)
-			if sr.msg != nil {
-				size := len(sr.msg.Payload)
-				p.publishSemaphore.Release()
-				p.metrics.MessagesPending.Dec()
-				p.metrics.BytesPending.Sub(float64(size))
-				p.metrics.PublishErrorsTimeout.Inc()
-				p.log.WithError(errSendTimeout).
-					WithField("size", size).
-					WithField("properties", sr.msg.Properties)
+		item := p.pendingQueue.Peek()
+		if item == nil {
+			// pending queue is empty
+			t.Reset(p.options.SendTimeout)
+			continue
+		}
+		oldestItem := item.(*pendingItem)
+		if nextWaiting := diff(oldestItem.sentAt); nextWaiting > 0 {
+			// none of these pending messages have timed out, wait and retry
+			t.Reset(nextWaiting)
+			continue
+		}
+
+		// since pending queue is not thread safe because of there is no global iteration lock
+		// to control poll from pending queue, current goroutine and connection receipt handler
+		// iterate pending queue at the same time, this maybe a performance trade-off
+		// see https://github.com/apache/pulsar-client-go/pull/301
+		curViewItems := p.pendingQueue.ReadableSlice()
+		viewSize := len(curViewItems)
+		if viewSize <= 0 {
+			// double check
+			t.Reset(p.options.SendTimeout)
+			continue
+		}
+		p.log.Infof("Failing %d messages", viewSize)
+		lastViewItem := curViewItems[viewSize-1].(*pendingItem)
+
+		// iterate at most viewSize items
+		for i := 0; i < viewSize; i++ {
+			item := p.pendingQueue.Poll()
+			if item == nil {
+				t.Reset(p.options.SendTimeout)
+				break
 			}
-			if sr.callback != nil {
-				sr.callback(nil, sr.msg, errSendTimeout)
+
+			pi := item.(*pendingItem)
+			pi.Lock()
+			if nextWaiting := diff(pi.sentAt); nextWaiting > 0 {
+				// current and subsequent items not timeout yet, stop iterating
+				t.Reset(nextWaiting)
+				pi.Unlock()
+				break
+			}
+
+			for _, i := range pi.sendRequests {
+				sr := i.(*sendRequest)
+				if sr.msg != nil {
+					size := len(sr.msg.Payload)
+					p.publishSemaphore.Release()
+					p.metrics.MessagesPending.Dec()
+					p.metrics.BytesPending.Sub(float64(size))
+					p.metrics.PublishErrorsTimeout.Inc()
+					p.log.WithError(errSendTimeout).
+						WithField("size", size).
+						WithField("properties", sr.msg.Properties)
+				}
+				if sr.callback != nil {
+					sr.callback(nil, sr.msg, errSendTimeout)
+				}
+			}
+
+			// flag the send has completed with error, flush make no effect
+			pi.completed = true
+			buffersPool.Put(pi.batchData)
+			pi.Unlock()
+
+			// finally reached the last view item, current iteration ends
+			if pi == lastViewItem {
+				t.Reset(p.options.SendTimeout)
+				break
 			}
 		}
-		buffersPool.Put(pi.batchData)
-		pi.Unlock()
 	}
 }