Support sendTimeout (#394)
* Support sendTimeout
* Substitute NonBlockIfQueueFull for BlockIfQueueFull
* Rename NonBlockIfQueueFull to DisableBlockIfQueueFull
* Replaced createdAt with sentAt which added in PR 397.
* Check and fail timeout messages before flushing batch
* Optimize code
* Remove redundant checking
diff --git a/integration-tests/standalone.conf b/integration-tests/standalone.conf
index 3fdf53a..b426ff3 100644
--- a/integration-tests/standalone.conf
+++ b/integration-tests/standalone.conf
@@ -54,7 +54,7 @@
backlogQuotaCheckEnabled=true
# How often to check for topics that have reached the quota
-backlogQuotaCheckIntervalInSeconds=60
+backlogQuotaCheckIntervalInSeconds=5
# Default per-topic backlog quota limit
backlogQuotaDefaultLimitGB=10
diff --git a/pulsar/producer.go b/pulsar/producer.go
index a2b7526..1dc0775 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -76,6 +76,15 @@
// This properties will be visible in the topic stats
Properties map[string]string
+ // SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+ // Send and SendAsync returns an error after timeout.
+ // Default is 30 seconds, negative such as -1 to disable.
+ SendTimeout time.Duration
+
+ // DisableBlockIfQueueFull control whether Send and SendAsync block if producer's message queue is full.
+ // Default is false, if set to true then Send and SendAsync return error when queue is full.
+ DisableBlockIfQueueFull bool
+
// MaxPendingMessages set the max size of the queue holding the messages pending to receive an
// acknowledgment from the broker.
MaxPendingMessages int
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index c166f70..51ae69e 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -32,6 +32,9 @@
)
const (
+ // defaultSendTimeout init default timeout for ack since sent.
+ defaultSendTimeout = 30 * time.Second
+
// defaultBatchingMaxPublishDelay init default for maximum delay to batch messages
defaultBatchingMaxPublishDelay = 10 * time.Millisecond
@@ -91,13 +94,16 @@
return nil, newError(ResultInvalidTopicName, "Topic name is required for producer")
}
+ if options.SendTimeout == 0 {
+ options.SendTimeout = defaultSendTimeout
+ }
if options.BatchingMaxMessages == 0 {
options.BatchingMaxMessages = defaultMaxMessagesPerBatch
}
if options.BatchingMaxSize == 0 {
options.BatchingMaxSize = defaultMaxBatchSize
}
- if options.BatchingMaxPublishDelay == 0 {
+ if options.BatchingMaxPublishDelay <= 0 {
options.BatchingMaxPublishDelay = defaultBatchingMaxPublishDelay
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 3397b6e..2a9093f 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -46,6 +46,8 @@
var (
errFailAddBatch = errors.New("message send failed")
+ errSendTimeout = errors.New("message send timeout")
+ errSendQueueIsFull = errors.New("producer send queue is full")
errMessageTooLarge = errors.New("message size exceeds MaxMessageSize")
buffersPool sync.Pool
@@ -430,26 +432,74 @@
sync.Mutex
batchData internal.Buffer
sequenceID uint64
- sentAt int64
+ sentAt time.Time
sendRequests []interface{}
completed bool
}
func (p *partitionProducer) internalFlushCurrentBatch() {
+ if p.options.SendTimeout > 0 {
+ p.failTimeoutMessages()
+ }
+
batchData, sequenceID, callbacks := p.batchBuilder.Flush()
if batchData == nil {
return
}
p.pendingQueue.Put(&pendingItem{
+ sentAt: time.Now(),
batchData: batchData,
sequenceID: sequenceID,
- sentAt: time.Now().UnixNano(),
sendRequests: callbacks,
})
p.cnx.WriteData(batchData)
}
+func (p *partitionProducer) failTimeoutMessages() {
+ // since Closing/Closed connection couldn't be reopen, load and compare is safe
+ state := atomic.LoadInt32(&p.state)
+ if state == producerClosing || state == producerClosed {
+ return
+ }
+
+ item := p.pendingQueue.Peek()
+ if item == nil {
+ // pending queue is empty
+ return
+ }
+
+ pi := item.(*pendingItem)
+ if time.Since(pi.sentAt) < p.options.SendTimeout {
+ // pending messages not timeout yet
+ return
+ }
+
+ p.log.Infof("Failing %d messages", p.pendingQueue.Size())
+ for p.pendingQueue.Size() > 0 {
+ pi = p.pendingQueue.Poll().(*pendingItem)
+ pi.Lock()
+ for _, i := range pi.sendRequests {
+ sr := i.(*sendRequest)
+ if sr.msg != nil {
+ size := len(sr.msg.Payload)
+ p.publishSemaphore.Release()
+ messagesPending.Dec()
+ bytesPending.Sub(float64(size))
+ publishErrors.Inc()
+ p.log.WithError(errSendTimeout).
+ WithField("size", size).
+ WithField("properties", sr.msg.Properties)
+ }
+ if sr.callback != nil {
+ sr.callback(nil, sr.msg, errSendTimeout)
+ }
+ }
+ buffersPool.Put(pi.batchData)
+ pi.Unlock()
+ }
+}
+
func (p *partitionProducer) internalFlush(fr *flushRequest) {
p.internalFlushCurrentBatch()
@@ -516,10 +566,20 @@
}
p.options.Interceptors.BeforeSend(p, msg)
+ if p.options.DisableBlockIfQueueFull {
+ if !p.publishSemaphore.TryAcquire() {
+ if callback != nil {
+ callback(nil, msg, errSendQueueIsFull)
+ }
+ return
+ }
+ } else {
+ p.publishSemaphore.Acquire()
+ }
+
messagesPending.Inc()
bytesPending.Add(float64(len(sr.msg.Payload)))
- p.publishSemaphore.Acquire()
p.eventsChan <- sr
}
@@ -553,9 +613,7 @@
// lock the pending item while sending the requests
pi.Lock()
defer pi.Unlock()
- if pi.sentAt > 0 {
- publishRPCLatency.Observe(float64(now-pi.sentAt) / 1.0e9)
- }
+ publishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
for idx, i := range pi.sendRequests {
sr := i.(*sendRequest)
if sr.msg != nil {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 3fe46c4..9475051 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -882,6 +882,54 @@
}
}
+func TestSendTimeout(t *testing.T) {
+ quotaURL := adminURL + "/admin/v2/namespaces/public/default/backlogQuota"
+ quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}`
+ makeHTTPCall(t, http.MethodPost, quotaURL, fmt.Sprintf(quotaFmt, 10*1024))
+
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "send_timeout_sub",
+ })
+ assert.Nil(t, err)
+ defer consumer.Close() // subscribe but do nothing
+
+ noRetry := uint(0)
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ SendTimeout: 2 * time.Second,
+ MaxReconnectToBroker: &noRetry,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ for i := 0; i < 10; i++ {
+ id, err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: make([]byte, 1024),
+ })
+ assert.Nil(t, err)
+ assert.NotNil(t, id)
+ }
+
+ // waiting for the backlog check
+ time.Sleep((5 + 1) * time.Second)
+
+ id, err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: make([]byte, 1024),
+ })
+ assert.NotNil(t, err)
+ assert.Nil(t, id)
+
+ makeHTTPCall(t, http.MethodDelete, quotaURL, "")
+}
+
type noopProduceInterceptor struct{}
func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {}