Support sendTimeout (#394)

* Support sendTimeout

* Substitute NonBlockIfQueueFull for BlockIfQueueFull

* Rename NonBlockIfQueueFull to DisableBlockIfQueueFull

* Replaced createdAt with sentAt which added in PR 397.

* Check and fail timeout messages before flushing batch

* Optimize code

* Remove redundant checking
diff --git a/integration-tests/standalone.conf b/integration-tests/standalone.conf
index 3fdf53a..b426ff3 100644
--- a/integration-tests/standalone.conf
+++ b/integration-tests/standalone.conf
@@ -54,7 +54,7 @@
 backlogQuotaCheckEnabled=true
 
 # How often to check for topics that have reached the quota
-backlogQuotaCheckIntervalInSeconds=60
+backlogQuotaCheckIntervalInSeconds=5
 
 # Default per-topic backlog quota limit
 backlogQuotaDefaultLimitGB=10
diff --git a/pulsar/producer.go b/pulsar/producer.go
index a2b7526..1dc0775 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -76,6 +76,15 @@
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, negative such as -1 to disable.
+	SendTimeout time.Duration
+
+	// DisableBlockIfQueueFull control whether Send and SendAsync block if producer's message queue is full.
+	// Default is false, if set to true then Send and SendAsync return error when queue is full.
+	DisableBlockIfQueueFull bool
+
 	// MaxPendingMessages set the max size of the queue holding the messages pending to receive an
 	// acknowledgment from the broker.
 	MaxPendingMessages int
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index c166f70..51ae69e 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -32,6 +32,9 @@
 )
 
 const (
+	// defaultSendTimeout init default timeout for ack since sent.
+	defaultSendTimeout = 30 * time.Second
+
 	// defaultBatchingMaxPublishDelay init default for maximum delay to batch messages
 	defaultBatchingMaxPublishDelay = 10 * time.Millisecond
 
@@ -91,13 +94,16 @@
 		return nil, newError(ResultInvalidTopicName, "Topic name is required for producer")
 	}
 
+	if options.SendTimeout == 0 {
+		options.SendTimeout = defaultSendTimeout
+	}
 	if options.BatchingMaxMessages == 0 {
 		options.BatchingMaxMessages = defaultMaxMessagesPerBatch
 	}
 	if options.BatchingMaxSize == 0 {
 		options.BatchingMaxSize = defaultMaxBatchSize
 	}
-	if options.BatchingMaxPublishDelay == 0 {
+	if options.BatchingMaxPublishDelay <= 0 {
 		options.BatchingMaxPublishDelay = defaultBatchingMaxPublishDelay
 	}
 
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 3397b6e..2a9093f 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -46,6 +46,8 @@
 
 var (
 	errFailAddBatch    = errors.New("message send failed")
+	errSendTimeout     = errors.New("message send timeout")
+	errSendQueueIsFull = errors.New("producer send queue is full")
 	errMessageTooLarge = errors.New("message size exceeds MaxMessageSize")
 
 	buffersPool sync.Pool
@@ -430,26 +432,74 @@
 	sync.Mutex
 	batchData    internal.Buffer
 	sequenceID   uint64
-	sentAt       int64
+	sentAt       time.Time
 	sendRequests []interface{}
 	completed    bool
 }
 
 func (p *partitionProducer) internalFlushCurrentBatch() {
+	if p.options.SendTimeout > 0 {
+		p.failTimeoutMessages()
+	}
+
 	batchData, sequenceID, callbacks := p.batchBuilder.Flush()
 	if batchData == nil {
 		return
 	}
 
 	p.pendingQueue.Put(&pendingItem{
+		sentAt:       time.Now(),
 		batchData:    batchData,
 		sequenceID:   sequenceID,
-		sentAt:       time.Now().UnixNano(),
 		sendRequests: callbacks,
 	})
 	p.cnx.WriteData(batchData)
 }
 
+func (p *partitionProducer) failTimeoutMessages() {
+	// since Closing/Closed connection couldn't be reopen, load and compare is safe
+	state := atomic.LoadInt32(&p.state)
+	if state == producerClosing || state == producerClosed {
+		return
+	}
+
+	item := p.pendingQueue.Peek()
+	if item == nil {
+		// pending queue is empty
+		return
+	}
+
+	pi := item.(*pendingItem)
+	if time.Since(pi.sentAt) < p.options.SendTimeout {
+		// pending messages not timeout yet
+		return
+	}
+
+	p.log.Infof("Failing %d messages", p.pendingQueue.Size())
+	for p.pendingQueue.Size() > 0 {
+		pi = p.pendingQueue.Poll().(*pendingItem)
+		pi.Lock()
+		for _, i := range pi.sendRequests {
+			sr := i.(*sendRequest)
+			if sr.msg != nil {
+				size := len(sr.msg.Payload)
+				p.publishSemaphore.Release()
+				messagesPending.Dec()
+				bytesPending.Sub(float64(size))
+				publishErrors.Inc()
+				p.log.WithError(errSendTimeout).
+					WithField("size", size).
+					WithField("properties", sr.msg.Properties)
+			}
+			if sr.callback != nil {
+				sr.callback(nil, sr.msg, errSendTimeout)
+			}
+		}
+		buffersPool.Put(pi.batchData)
+		pi.Unlock()
+	}
+}
+
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
 	p.internalFlushCurrentBatch()
 
@@ -516,10 +566,20 @@
 	}
 	p.options.Interceptors.BeforeSend(p, msg)
 
+	if p.options.DisableBlockIfQueueFull {
+		if !p.publishSemaphore.TryAcquire() {
+			if callback != nil {
+				callback(nil, msg, errSendQueueIsFull)
+			}
+			return
+		}
+	} else {
+		p.publishSemaphore.Acquire()
+	}
+
 	messagesPending.Inc()
 	bytesPending.Add(float64(len(sr.msg.Payload)))
 
-	p.publishSemaphore.Acquire()
 	p.eventsChan <- sr
 }
 
@@ -553,9 +613,7 @@
 	// lock the pending item while sending the requests
 	pi.Lock()
 	defer pi.Unlock()
-	if pi.sentAt > 0 {
-		publishRPCLatency.Observe(float64(now-pi.sentAt) / 1.0e9)
-	}
+	publishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
 	for idx, i := range pi.sendRequests {
 		sr := i.(*sendRequest)
 		if sr.msg != nil {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 3fe46c4..9475051 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -882,6 +882,54 @@
 	}
 }
 
+func TestSendTimeout(t *testing.T) {
+	quotaURL := adminURL + "/admin/v2/namespaces/public/default/backlogQuota"
+	quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}`
+	makeHTTPCall(t, http.MethodPost, quotaURL, fmt.Sprintf(quotaFmt, 10*1024))
+
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+	})
+	assert.NoError(t, err)
+	defer client.Close()
+
+	topicName := newTopicName()
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "send_timeout_sub",
+	})
+	assert.Nil(t, err)
+	defer consumer.Close() // subscribe but do nothing
+
+	noRetry := uint(0)
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:                topicName,
+		SendTimeout:          2 * time.Second,
+		MaxReconnectToBroker: &noRetry,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	for i := 0; i < 10; i++ {
+		id, err := producer.Send(context.Background(), &ProducerMessage{
+			Payload: make([]byte, 1024),
+		})
+		assert.Nil(t, err)
+		assert.NotNil(t, id)
+	}
+
+	// waiting for the backlog check
+	time.Sleep((5 + 1) * time.Second)
+
+	id, err := producer.Send(context.Background(), &ProducerMessage{
+		Payload: make([]byte, 1024),
+	})
+	assert.NotNil(t, err)
+	assert.Nil(t, id)
+
+	makeHTTPCall(t, http.MethodDelete, quotaURL, "")
+}
+
 type noopProduceInterceptor struct{}
 
 func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {}