Added negative acks tracker and integration (#94)
* Added negative acks tracker and integration
* Fixed race condition in tests
* Fixed return after warning
* fixed format
diff --git a/examples/consumer-listener/consumer-listener.go b/examples/consumer-listener/consumer-listener.go
index 0a8e6c9..618c3c0 100644
--- a/examples/consumer-listener/consumer-listener.go
+++ b/examples/consumer-listener/consumer-listener.go
@@ -57,8 +57,6 @@
fmt.Printf("Received message msgId: %v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
- if err := consumer.Ack(msg); err != nil {
- log.Fatal(err)
- }
+ consumer.Ack(msg)
}
}
diff --git a/examples/consumer/consumer.go b/examples/consumer/consumer.go
index 5250a02..0b819bc 100644
--- a/examples/consumer/consumer.go
+++ b/examples/consumer/consumer.go
@@ -52,9 +52,7 @@
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
- if err := consumer.Ack(msg); err != nil {
- log.Fatal(err)
- }
+ consumer.Ack(msg)
}
if err := consumer.Unsubscribe(); err != nil {
diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
index 8fe2a86..582c06d 100644
--- a/perf/perf-consumer.go
+++ b/perf/perf-consumer.go
@@ -101,9 +101,7 @@
}
msgReceived++
bytesReceived += int64(len(cm.Message.Payload()))
- if err := consumer.Ack(cm.Message); err != nil {
- return
- }
+ consumer.Ack(cm.Message)
case <-tick.C:
currentMsgReceived := atomic.SwapInt64(&msgReceived, 0)
currentBytesReceived := atomic.SwapInt64(&bytesReceived, 0)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index b0bba1b..961611e 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -19,6 +19,7 @@
import (
"context"
+ "time"
)
// Pair of a Consumer and Message
@@ -106,6 +107,10 @@
// ReceiverQueueSize(int) if the total exceeds this value (default: 50000).
MaxTotalReceiverQueueSizeAcrossPartitions int
+ // The delay after which to redeliver the messages that failed to be
+ // processed. Default is 1min. (See `Consumer.Nack()`)
+ NackRedeliveryDelay *time.Duration
+
// Set the consumer name.
Name string
@@ -136,10 +141,28 @@
Chan() <-chan ConsumerMessage
// Ack the consumption of a single message
- Ack(Message) error
+ Ack(Message)
// AckID the consumption of a single message, identified by its MessageID
- AckID(MessageID) error
+ AckID(MessageID)
+
+ // Acknowledge the failure to process a single message.
+ //
+ // When a message is "negatively acked" it will be marked for redelivery after
+ // some fixed delay. The delay is configurable when constructing the consumer
+ // with ConsumerOptions.NAckRedeliveryDelay .
+ //
+ // This call is not blocking.
+ Nack(Message)
+
+ // Acknowledge the failure to process a single message.
+ //
+ // When a message is "negatively acked" it will be marked for redelivery after
+ // some fixed delay. The delay is configurable when constructing the consumer
+ // with ConsumerOptions.NackRedeliveryDelay .
+ //
+ // This call is not blocking.
+ NackID(MessageID)
// Close the consumer and stop the broker to push more messages
Close() error
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 8b73631..0a628b7 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -33,6 +33,8 @@
var ErrConsumerClosed = errors.New("consumer closed")
+const defaultNackRedeliveryDelay = 1 * time.Minute
+
type consumer struct {
options ConsumerOptions
@@ -117,6 +119,13 @@
wg.Add(1)
go func(idx int, pt string) {
defer wg.Done()
+
+ var nackRedeliveryDelay time.Duration
+ if options.NackRedeliveryDelay == nil {
+ nackRedeliveryDelay = defaultNackRedeliveryDelay
+ } else {
+ nackRedeliveryDelay = *options.NackRedeliveryDelay
+ }
opts := &partitionConsumerOpts{
topic: pt,
consumerName: consumerName,
@@ -125,6 +134,7 @@
subscriptionInitPos: options.SubscriptionInitialPosition,
partitionIdx: idx,
receiverQueueSize: receiverQueueSize,
+ nackRedeliveryDelay: nackRedeliveryDelay,
}
cons, err := newPartitionConsumer(consumer, client, opts, messageCh)
ch <- ConsumerError{
@@ -199,24 +209,48 @@
}
// Ack the consumption of a single message
-func (c *consumer) Ack(msg Message) error {
- return c.AckID(msg.ID())
+func (c *consumer) Ack(msg Message) {
+ c.AckID(msg.ID())
}
// Ack the consumption of a single message, identified by its MessageID
-func (c *consumer) AckID(msgID MessageID) error {
+func (c *consumer) AckID(msgID MessageID) {
mid, ok := msgID.(*messageID)
if !ok {
- return fmt.Errorf("invalid message id type")
+ c.log.Warnf("invalid message id type")
+ return
}
partition := mid.partitionIdx
// did we receive a valid partition index?
if partition < 0 || partition >= len(c.consumers) {
- return fmt.Errorf("invalid partition index %d expected a partition between [0-%d]",
+ c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
partition, len(c.consumers))
+ return
}
- return c.consumers[partition].AckID(msgID)
+ c.consumers[partition].AckID(mid)
+}
+
+func (c *consumer) Nack(msg Message) {
+ c.AckID(msg.ID())
+}
+
+func (c *consumer) NackID(msgID MessageID) {
+ mid, ok := msgID.(*messageID)
+ if !ok {
+ c.log.Warnf("invalid message id type")
+ return
+ }
+
+ partition := mid.partitionIdx
+ // did we receive a valid partition index?
+ if partition < 0 || partition >= len(c.consumers) {
+ c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
+ partition, len(c.consumers))
+ return
+ }
+
+ c.consumers[partition].NackID(mid)
}
func (c *consumer) Close() error {
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index aaf6ade..a8d9b6c 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -47,6 +47,7 @@
subscriptionInitPos SubscriptionInitialPosition
partitionIdx int
receiverQueueSize int
+ nackRedeliveryDelay time.Duration
}
type partitionConsumer struct {
@@ -78,6 +79,8 @@
connectedCh chan struct{}
closeCh chan struct{}
+ nackTracker *negativeAcksTracker
+
log *log.Entry
}
@@ -101,6 +104,7 @@
log: log.WithField("topic", options.topic),
}
pc.log = pc.log.WithField("name", pc.name).WithField("subscription", options.subscription)
+ pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay)
err := pc.grabConn()
if err != nil {
@@ -143,19 +147,39 @@
pc.conn.DeleteConsumeHandler(pc.consumerID)
}
-func (pc *partitionConsumer) Ack(msg Message) error {
- return pc.AckID(msg.ID())
-}
-
-func (pc *partitionConsumer) AckID(msgID MessageID) error {
+func (pc *partitionConsumer) AckID(msgID *messageID) {
req := &ackRequest{
- doneCh: make(chan struct{}),
- msgID: msgID,
+ msgID: msgID,
}
pc.eventsCh <- req
+}
- <-req.doneCh
- return req.err
+func (pc *partitionConsumer) NackID(msgID *messageID) {
+ pc.nackTracker.Add(msgID)
+}
+
+func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
+ pc.eventsCh <- &redeliveryRequest{msgIds}
+}
+
+func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
+ msgIds := req.msgIds
+ pc.log.Debug("Request redelivery after negative ack for messages", msgIds)
+
+ msgIdDataList := make([]*pb.MessageIdData, len(msgIds))
+ for i := 0; i < len(msgIds); i++ {
+ msgIdDataList[i] = &pb.MessageIdData{
+ LedgerId: proto.Uint64(uint64(msgIds[i].ledgerID)),
+ EntryId: proto.Uint64(uint64(msgIds[i].entryID)),
+ }
+ }
+
+ requestID := internal.RequestIDNoResponse
+ pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID,
+ pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{
+ ConsumerId: proto.Uint64(pc.consumerID),
+ MessageIds: msgIdDataList,
+ })
}
func (pc *partitionConsumer) Close() error {
@@ -172,28 +196,21 @@
}
func (pc *partitionConsumer) internalAck(req *ackRequest) {
- defer close(req.doneCh)
+ msgId := req.msgID
- id := &pb.MessageIdData{}
- messageIDs := make([]*pb.MessageIdData, 0)
- err := proto.Unmarshal(req.msgID.Serialize(), id)
- if err != nil {
- pc.log.WithError(err).Error("unable to serialize message id")
- req.err = err
+ messageIDs := make([]*pb.MessageIdData, 1)
+ messageIDs[0] = &pb.MessageIdData{
+ LedgerId: proto.Uint64(uint64(msgId.ledgerID)),
+ EntryId: proto.Uint64(uint64(msgId.entryID)),
}
-
- messageIDs = append(messageIDs, id)
requestID := internal.RequestIDNoResponse
cmdAck := &pb.CommandAck{
ConsumerId: proto.Uint64(pc.consumerID),
MessageId: messageIDs,
AckType: pb.CommandAck_Individual.Enum(),
}
- _, err = pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, pb.BaseCommand_ACK, cmdAck)
- if err != nil {
- pc.log.WithError(err).Errorf("failed to ack message_id=%s", id)
- req.err = err
- }
+
+ pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, pb.BaseCommand_ACK, cmdAck)
}
func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
@@ -369,9 +386,7 @@
}
type ackRequest struct {
- doneCh chan struct{}
- msgID MessageID
- err error
+ msgID *messageID
}
type unsubscribeRequest struct {
@@ -384,6 +399,10 @@
err error
}
+type redeliveryRequest struct {
+ msgIds []messageID
+}
+
func (pc *partitionConsumer) runEventsLoop() {
defer func() {
pc.log.Info("exiting events loop")
@@ -396,6 +415,8 @@
switch v := i.(type) {
case *ackRequest:
pc.internalAck(v)
+ case *redeliveryRequest:
+ pc.internalRedeliver(v)
case *unsubscribeRequest:
pc.internalUnsubscribe(v)
case *connectionClosed:
@@ -429,6 +450,7 @@
pc.log.Info("Closed consumer")
pc.state = consumerClosed
pc.conn.DeleteConsumeHandler(pc.consumerID)
+ pc.nackTracker.Close()
close(pc.closeCh)
}
}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 33c2f15..c45b932 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -90,9 +90,7 @@
assert.Equal(t, expectProperties, msg.Properties())
// ack message
- if err := consumer.Ack(msg); err != nil {
- log.Fatal(err)
- }
+ consumer.Ack(msg)
}
}
@@ -163,8 +161,7 @@
for i := 0; i < numOfMessages; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
- err = consumer.Ack(msg)
- assert.Nil(t, err)
+ consumer.Ack(msg)
count++
}
@@ -301,17 +298,13 @@
break
}
receivedConsumer1++
- if err := consumer1.Ack(cm.Message); err != nil {
- log.Fatal(err)
- }
+ consumer1.Ack(cm.Message)
case cm, ok := <-consumer2.Chan():
if !ok {
break
}
receivedConsumer2++
- if err := consumer2.Ack(cm.Message); err != nil {
- log.Fatal(err)
- }
+ consumer2.Ack(cm.Message)
}
}
@@ -372,9 +365,7 @@
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
- if err := consumer.Ack(msg); err != nil {
- assert.Nil(t, err)
- }
+ consumer.Ack(msg)
}
assert.Equal(t, len(msgs), 10)
@@ -465,9 +456,7 @@
payload := string(cm.Message.Payload())
messages[payload] = struct{}{}
fmt.Printf("consumer1 msg id is: %v, value is: %s\n", cm.Message.ID(), payload)
- if err := consumer1.Ack(cm.Message); err != nil {
- log.Fatal(err)
- }
+ consumer1.Ack(cm.Message)
case cm, ok := <-consumer2.Chan():
if !ok {
break
@@ -476,9 +465,7 @@
payload := string(cm.Message.Payload())
messages[payload] = struct{}{}
fmt.Printf("consumer2 msg id is: %v, value is: %s\n", cm.Message.ID(), payload)
- if err := consumer2.Ack(cm.Message); err != nil {
- log.Fatal(err)
- }
+ consumer2.Ack(cm.Message)
}
}
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index d806087..b0620c2 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -146,9 +146,9 @@
func (p *producer) Flush() error {
for _, pp := range p.producers {
- if err := pp.Flush(); err != nil {
- return err
- }
+ if err := pp.Flush(); err != nil {
+ return err
+ }
}
return nil
diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
new file mode 100644
index 0000000..e8a79e8
--- /dev/null
+++ b/pulsar/negative_acks_tracker.go
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ log "github.com/sirupsen/logrus"
+ "sync"
+ "time"
+)
+
+type redeliveryConsumer interface {
+ Redeliver(msgIds []messageID)
+}
+
+type negativeAcksTracker struct {
+ sync.Mutex
+
+ doneCh chan interface{}
+ negativeAcks map[messageID]time.Time
+ rc redeliveryConsumer
+ tick *time.Ticker
+ delay time.Duration
+}
+
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration) *negativeAcksTracker {
+ t := &negativeAcksTracker{
+ doneCh: make(chan interface{}),
+ negativeAcks: make(map[messageID]time.Time),
+ rc: rc,
+ tick: time.NewTicker(delay / 3),
+ delay: delay,
+ }
+
+ go t.track()
+ return t
+}
+
+func (t *negativeAcksTracker) Add(msgID *messageID) {
+ // Always clear up the batch index since we want to track the nack
+ // for the entire batch
+ batchMsgId := messageID{
+ ledgerID: msgID.ledgerID,
+ entryID: msgID.entryID,
+ batchIdx: 0,
+ }
+
+ t.Lock()
+ defer t.Unlock()
+
+ _, present := t.negativeAcks[batchMsgId]
+ if present {
+ // The batch is already being tracked
+ return
+ } else {
+ targetTime := time.Now().Add(t.delay)
+ t.negativeAcks[batchMsgId] = targetTime
+ }
+}
+
+func (t *negativeAcksTracker) track() {
+ for {
+ select {
+ case <-t.doneCh:
+ log.Debug("Closing nack tracker")
+ return
+
+ case <-t.tick.C:
+ {
+ t.Lock()
+
+ now := time.Now()
+ msgIds := make([]messageID, 0)
+ for msgID, targetTime := range t.negativeAcks {
+ log.Debugf("MsgId: %v -- targetTime: %v -- now: %v", msgID, targetTime, now)
+ if targetTime.Before(now) {
+ log.Debugf("Adding MsgId: %v", msgID)
+ msgIds = append(msgIds, msgID)
+ delete(t.negativeAcks, msgID)
+ }
+ }
+
+ t.Unlock()
+
+ if len(msgIds) > 0 {
+ t.rc.Redeliver(msgIds)
+ }
+ }
+
+ }
+ }
+}
+
+func (t *negativeAcksTracker) Close() {
+ t.doneCh <- nil
+}
diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go
new file mode 100644
index 0000000..733114a
--- /dev/null
+++ b/pulsar/negative_acks_tracker_test.go
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ "github.com/stretchr/testify/assert"
+ "sort"
+ "sync"
+ "testing"
+ "time"
+)
+
+type nackMockedConsumer struct {
+ sync.Mutex
+ cond *sync.Cond
+ msgIds []messageID
+}
+
+func (nmc *nackMockedConsumer) Redeliver(msgIds []messageID) {
+ nmc.Lock()
+ if nmc.msgIds == nil {
+ nmc.msgIds = msgIds
+ sort.Slice(msgIds, func(i, j int) bool {
+ return msgIds[i].ledgerID < msgIds[j].entryID
+ })
+ nmc.cond.Signal()
+ }
+
+ nmc.Unlock()
+}
+
+func (nmc *nackMockedConsumer) Wait() []messageID {
+ nmc.Lock()
+ defer nmc.Unlock()
+ nmc.cond.Wait()
+
+ return nmc.msgIds
+}
+
+func TestNacksTracker(t *testing.T) {
+ nmc := &nackMockedConsumer{}
+ nmc.cond = sync.NewCond(nmc)
+ nacks := newNegativeAcksTracker(nmc, 1*time.Second)
+
+ nacks.Add(&messageID{
+ ledgerID: 1,
+ entryID: 1,
+ batchIdx: 1,
+ })
+
+ nacks.Add(&messageID{
+ ledgerID: 2,
+ entryID: 2,
+ batchIdx: 1,
+ })
+
+ msgIds := nmc.Wait()
+
+ assert.Equal(t, 2, len(msgIds))
+ assert.Equal(t, int64(1), msgIds[0].ledgerID)
+ assert.Equal(t, int64(1), msgIds[0].entryID)
+ assert.Equal(t, int64(2), msgIds[1].ledgerID)
+ assert.Equal(t, int64(2), msgIds[1].entryID)
+
+ nacks.Close()
+}
+
+func TestNacksWithBatchesTracker(t *testing.T) {
+ nmc := &nackMockedConsumer{}
+ nmc.cond = sync.NewCond(nmc)
+ nacks := newNegativeAcksTracker(nmc, 1*time.Second)
+
+ nacks.Add(&messageID{
+ ledgerID: 1,
+ entryID: 1,
+ batchIdx: 1,
+ })
+
+ nacks.Add(&messageID{
+ ledgerID: 1,
+ entryID: 1,
+ batchIdx: 2,
+ })
+
+ nacks.Add(&messageID{
+ ledgerID: 1,
+ entryID: 1,
+ batchIdx: 3,
+ })
+
+ nacks.Add(&messageID{
+ ledgerID: 2,
+ entryID: 2,
+ batchIdx: 1,
+ })
+
+ msgIds := nmc.Wait()
+
+ assert.Equal(t, 2, len(msgIds))
+ assert.Equal(t, int64(1), msgIds[0].ledgerID)
+ assert.Equal(t, int64(1), msgIds[0].entryID)
+ assert.Equal(t, int64(2), msgIds[1].ledgerID)
+ assert.Equal(t, int64(2), msgIds[1].entryID)
+
+ nacks.Close()
+}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 052e92a..1961172 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -434,8 +434,7 @@
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
assert.Nil(t, err)
- err = consumer.Ack(msg)
- assert.Nil(t, err)
+ consumer.Ack(msg)
msgCount++
}
assert.Equal(t, msgCount, numOfMessages/2)