Added negative acks tracker and integration (#94)

* Added negative acks tracker and integration

* Fixed race condition in tests

* Fixed return after warning

* fixed format
diff --git a/examples/consumer-listener/consumer-listener.go b/examples/consumer-listener/consumer-listener.go
index 0a8e6c9..618c3c0 100644
--- a/examples/consumer-listener/consumer-listener.go
+++ b/examples/consumer-listener/consumer-listener.go
@@ -57,8 +57,6 @@
 		fmt.Printf("Received message  msgId: %v -- content: '%s'\n",
 			msg.ID(), string(msg.Payload()))
 
-		if err := consumer.Ack(msg); err != nil {
-			log.Fatal(err)
-		}
+		consumer.Ack(msg)
 	}
 }
diff --git a/examples/consumer/consumer.go b/examples/consumer/consumer.go
index 5250a02..0b819bc 100644
--- a/examples/consumer/consumer.go
+++ b/examples/consumer/consumer.go
@@ -52,9 +52,7 @@
 		fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
 			msg.ID(), string(msg.Payload()))
 
-		if err := consumer.Ack(msg); err != nil {
-			log.Fatal(err)
-		}
+		consumer.Ack(msg)
 	}
 
 	if err := consumer.Unsubscribe(); err != nil {
diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
index 8fe2a86..582c06d 100644
--- a/perf/perf-consumer.go
+++ b/perf/perf-consumer.go
@@ -101,9 +101,7 @@
 			}
 			msgReceived++
 			bytesReceived += int64(len(cm.Message.Payload()))
-			if err := consumer.Ack(cm.Message); err != nil {
-				return
-			}
+			consumer.Ack(cm.Message)
 		case <-tick.C:
 			currentMsgReceived := atomic.SwapInt64(&msgReceived, 0)
 			currentBytesReceived := atomic.SwapInt64(&bytesReceived, 0)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index b0bba1b..961611e 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -19,6 +19,7 @@
 
 import (
 	"context"
+	"time"
 )
 
 // Pair of a Consumer and Message
@@ -106,6 +107,10 @@
 	// ReceiverQueueSize(int) if the total exceeds this value (default: 50000).
 	MaxTotalReceiverQueueSizeAcrossPartitions int
 
+	// The delay after which to redeliver the messages that failed to be
+	// processed. Default is 1min. (See `Consumer.Nack()`)
+	NackRedeliveryDelay *time.Duration
+
 	// Set the consumer name.
 	Name string
 
@@ -136,10 +141,28 @@
 	Chan() <-chan ConsumerMessage
 
 	// Ack the consumption of a single message
-	Ack(Message) error
+	Ack(Message)
 
 	// AckID the consumption of a single message, identified by its MessageID
-	AckID(MessageID) error
+	AckID(MessageID)
+
+	// Acknowledge the failure to process a single message.
+	//
+	// When a message is "negatively acked" it will be marked for redelivery after
+	// some fixed delay. The delay is configurable when constructing the consumer
+	// with ConsumerOptions.NAckRedeliveryDelay .
+	//
+	// This call is not blocking.
+	Nack(Message)
+
+	// Acknowledge the failure to process a single message.
+	//
+	// When a message is "negatively acked" it will be marked for redelivery after
+	// some fixed delay. The delay is configurable when constructing the consumer
+	// with ConsumerOptions.NackRedeliveryDelay .
+	//
+	// This call is not blocking.
+	NackID(MessageID)
 
 	// Close the consumer and stop the broker to push more messages
 	Close() error
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 8b73631..0a628b7 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -33,6 +33,8 @@
 
 var ErrConsumerClosed = errors.New("consumer closed")
 
+const defaultNackRedeliveryDelay = 1 * time.Minute
+
 type consumer struct {
 	options ConsumerOptions
 
@@ -117,6 +119,13 @@
 		wg.Add(1)
 		go func(idx int, pt string) {
 			defer wg.Done()
+
+			var nackRedeliveryDelay time.Duration
+			if options.NackRedeliveryDelay == nil {
+				nackRedeliveryDelay = defaultNackRedeliveryDelay
+			} else {
+				nackRedeliveryDelay = *options.NackRedeliveryDelay
+			}
 			opts := &partitionConsumerOpts{
 				topic:               pt,
 				consumerName:        consumerName,
@@ -125,6 +134,7 @@
 				subscriptionInitPos: options.SubscriptionInitialPosition,
 				partitionIdx:        idx,
 				receiverQueueSize:   receiverQueueSize,
+				nackRedeliveryDelay: nackRedeliveryDelay,
 			}
 			cons, err := newPartitionConsumer(consumer, client, opts, messageCh)
 			ch <- ConsumerError{
@@ -199,24 +209,48 @@
 }
 
 // Ack the consumption of a single message
-func (c *consumer) Ack(msg Message) error {
-	return c.AckID(msg.ID())
+func (c *consumer) Ack(msg Message) {
+	c.AckID(msg.ID())
 }
 
 // Ack the consumption of a single message, identified by its MessageID
-func (c *consumer) AckID(msgID MessageID) error {
+func (c *consumer) AckID(msgID MessageID) {
 	mid, ok := msgID.(*messageID)
 	if !ok {
-		return fmt.Errorf("invalid message id type")
+		c.log.Warnf("invalid message id type")
+		return
 	}
 
 	partition := mid.partitionIdx
 	// did we receive a valid partition index?
 	if partition < 0 || partition >= len(c.consumers) {
-		return fmt.Errorf("invalid partition index %d expected a partition between [0-%d]",
+		c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
 			partition, len(c.consumers))
+		return
 	}
-	return c.consumers[partition].AckID(msgID)
+	c.consumers[partition].AckID(mid)
+}
+
+func (c *consumer) Nack(msg Message) {
+	c.AckID(msg.ID())
+}
+
+func (c *consumer) NackID(msgID MessageID) {
+	mid, ok := msgID.(*messageID)
+	if !ok {
+		c.log.Warnf("invalid message id type")
+		return
+	}
+
+	partition := mid.partitionIdx
+	// did we receive a valid partition index?
+	if partition < 0 || partition >= len(c.consumers) {
+		c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
+			partition, len(c.consumers))
+		return
+	}
+
+	c.consumers[partition].NackID(mid)
 }
 
 func (c *consumer) Close() error {
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index aaf6ade..a8d9b6c 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -47,6 +47,7 @@
 	subscriptionInitPos SubscriptionInitialPosition
 	partitionIdx        int
 	receiverQueueSize   int
+	nackRedeliveryDelay time.Duration
 }
 
 type partitionConsumer struct {
@@ -78,6 +79,8 @@
 	connectedCh chan struct{}
 	closeCh     chan struct{}
 
+	nackTracker *negativeAcksTracker
+
 	log *log.Entry
 }
 
@@ -101,6 +104,7 @@
 		log:            log.WithField("topic", options.topic),
 	}
 	pc.log = pc.log.WithField("name", pc.name).WithField("subscription", options.subscription)
+	pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay)
 
 	err := pc.grabConn()
 	if err != nil {
@@ -143,19 +147,39 @@
 	pc.conn.DeleteConsumeHandler(pc.consumerID)
 }
 
-func (pc *partitionConsumer) Ack(msg Message) error {
-	return pc.AckID(msg.ID())
-}
-
-func (pc *partitionConsumer) AckID(msgID MessageID) error {
+func (pc *partitionConsumer) AckID(msgID *messageID) {
 	req := &ackRequest{
-		doneCh: make(chan struct{}),
-		msgID:  msgID,
+		msgID: msgID,
 	}
 	pc.eventsCh <- req
+}
 
-	<-req.doneCh
-	return req.err
+func (pc *partitionConsumer) NackID(msgID *messageID) {
+	pc.nackTracker.Add(msgID)
+}
+
+func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
+	pc.eventsCh <- &redeliveryRequest{msgIds}
+}
+
+func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
+	msgIds := req.msgIds
+	pc.log.Debug("Request redelivery after negative ack for messages", msgIds)
+
+	msgIdDataList := make([]*pb.MessageIdData, len(msgIds))
+	for i := 0; i < len(msgIds); i++ {
+		msgIdDataList[i] = &pb.MessageIdData{
+			LedgerId: proto.Uint64(uint64(msgIds[i].ledgerID)),
+			EntryId:  proto.Uint64(uint64(msgIds[i].entryID)),
+		}
+	}
+
+	requestID := internal.RequestIDNoResponse
+	pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID,
+		pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{
+			ConsumerId: proto.Uint64(pc.consumerID),
+			MessageIds: msgIdDataList,
+		})
 }
 
 func (pc *partitionConsumer) Close() error {
@@ -172,28 +196,21 @@
 }
 
 func (pc *partitionConsumer) internalAck(req *ackRequest) {
-	defer close(req.doneCh)
+	msgId := req.msgID
 
-	id := &pb.MessageIdData{}
-	messageIDs := make([]*pb.MessageIdData, 0)
-	err := proto.Unmarshal(req.msgID.Serialize(), id)
-	if err != nil {
-		pc.log.WithError(err).Error("unable to serialize message id")
-		req.err = err
+	messageIDs := make([]*pb.MessageIdData, 1)
+	messageIDs[0] = &pb.MessageIdData{
+		LedgerId: proto.Uint64(uint64(msgId.ledgerID)),
+		EntryId:  proto.Uint64(uint64(msgId.entryID)),
 	}
-
-	messageIDs = append(messageIDs, id)
 	requestID := internal.RequestIDNoResponse
 	cmdAck := &pb.CommandAck{
 		ConsumerId: proto.Uint64(pc.consumerID),
 		MessageId:  messageIDs,
 		AckType:    pb.CommandAck_Individual.Enum(),
 	}
-	_, err = pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, pb.BaseCommand_ACK, cmdAck)
-	if err != nil {
-		pc.log.WithError(err).Errorf("failed to ack message_id=%s", id)
-		req.err = err
-	}
+
+	pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, pb.BaseCommand_ACK, cmdAck)
 }
 
 func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
@@ -369,9 +386,7 @@
 }
 
 type ackRequest struct {
-	doneCh chan struct{}
-	msgID  MessageID
-	err    error
+	msgID *messageID
 }
 
 type unsubscribeRequest struct {
@@ -384,6 +399,10 @@
 	err    error
 }
 
+type redeliveryRequest struct {
+	msgIds []messageID
+}
+
 func (pc *partitionConsumer) runEventsLoop() {
 	defer func() {
 		pc.log.Info("exiting events loop")
@@ -396,6 +415,8 @@
 			switch v := i.(type) {
 			case *ackRequest:
 				pc.internalAck(v)
+			case *redeliveryRequest:
+				pc.internalRedeliver(v)
 			case *unsubscribeRequest:
 				pc.internalUnsubscribe(v)
 			case *connectionClosed:
@@ -429,6 +450,7 @@
 		pc.log.Info("Closed consumer")
 		pc.state = consumerClosed
 		pc.conn.DeleteConsumeHandler(pc.consumerID)
+		pc.nackTracker.Close()
 		close(pc.closeCh)
 	}
 }
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 33c2f15..c45b932 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -90,9 +90,7 @@
 		assert.Equal(t, expectProperties, msg.Properties())
 
 		// ack message
-		if err := consumer.Ack(msg); err != nil {
-			log.Fatal(err)
-		}
+		consumer.Ack(msg)
 	}
 }
 
@@ -163,8 +161,7 @@
 	for i := 0; i < numOfMessages; i++ {
 		msg, err := consumer.Receive(ctx)
 		assert.Nil(t, err)
-		err = consumer.Ack(msg)
-		assert.Nil(t, err)
+		consumer.Ack(msg)
 		count++
 	}
 
@@ -301,17 +298,13 @@
 				break
 			}
 			receivedConsumer1++
-			if err := consumer1.Ack(cm.Message); err != nil {
-				log.Fatal(err)
-			}
+			consumer1.Ack(cm.Message)
 		case cm, ok := <-consumer2.Chan():
 			if !ok {
 				break
 			}
 			receivedConsumer2++
-			if err := consumer2.Ack(cm.Message); err != nil {
-				log.Fatal(err)
-			}
+			consumer2.Ack(cm.Message)
 		}
 	}
 
@@ -372,9 +365,7 @@
 		fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
 			msg.ID(), string(msg.Payload()))
 
-		if err := consumer.Ack(msg); err != nil {
-			assert.Nil(t, err)
-		}
+		consumer.Ack(msg)
 	}
 
 	assert.Equal(t, len(msgs), 10)
@@ -465,9 +456,7 @@
 			payload := string(cm.Message.Payload())
 			messages[payload] = struct{}{}
 			fmt.Printf("consumer1 msg id is: %v, value is: %s\n", cm.Message.ID(), payload)
-			if err := consumer1.Ack(cm.Message); err != nil {
-				log.Fatal(err)
-			}
+			consumer1.Ack(cm.Message)
 		case cm, ok := <-consumer2.Chan():
 			if !ok {
 				break
@@ -476,9 +465,7 @@
 			payload := string(cm.Message.Payload())
 			messages[payload] = struct{}{}
 			fmt.Printf("consumer2 msg id is: %v, value is: %s\n", cm.Message.ID(), payload)
-			if err := consumer2.Ack(cm.Message); err != nil {
-				log.Fatal(err)
-			}
+			consumer2.Ack(cm.Message)
 		}
 	}
 
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index d806087..b0620c2 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -146,9 +146,9 @@
 
 func (p *producer) Flush() error {
 	for _, pp := range p.producers {
-		 if err :=  pp.Flush(); err != nil {
-		 	return err
-		 }
+		if err := pp.Flush(); err != nil {
+			return err
+		}
 
 	}
 	return nil
diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
new file mode 100644
index 0000000..e8a79e8
--- /dev/null
+++ b/pulsar/negative_acks_tracker.go
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	log "github.com/sirupsen/logrus"
+	"sync"
+	"time"
+)
+
+type redeliveryConsumer interface {
+	Redeliver(msgIds []messageID)
+}
+
+type negativeAcksTracker struct {
+	sync.Mutex
+
+	doneCh       chan interface{}
+	negativeAcks map[messageID]time.Time
+	rc           redeliveryConsumer
+	tick         *time.Ticker
+	delay        time.Duration
+}
+
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration) *negativeAcksTracker {
+	t := &negativeAcksTracker{
+		doneCh:       make(chan interface{}),
+		negativeAcks: make(map[messageID]time.Time),
+		rc:           rc,
+		tick:         time.NewTicker(delay / 3),
+		delay:        delay,
+	}
+
+	go t.track()
+	return t
+}
+
+func (t *negativeAcksTracker) Add(msgID *messageID) {
+	// Always clear up the batch index since we want to track the nack
+	// for the entire batch
+	batchMsgId := messageID{
+		ledgerID: msgID.ledgerID,
+		entryID:  msgID.entryID,
+		batchIdx: 0,
+	}
+
+	t.Lock()
+	defer t.Unlock()
+
+	_, present := t.negativeAcks[batchMsgId]
+	if present {
+		// The batch is already being tracked
+		return
+	} else {
+		targetTime := time.Now().Add(t.delay)
+		t.negativeAcks[batchMsgId] = targetTime
+	}
+}
+
+func (t *negativeAcksTracker) track() {
+	for {
+		select {
+		case <-t.doneCh:
+			log.Debug("Closing nack tracker")
+			return
+
+		case <-t.tick.C:
+			{
+				t.Lock()
+
+				now := time.Now()
+				msgIds := make([]messageID, 0)
+				for msgID, targetTime := range t.negativeAcks {
+					log.Debugf("MsgId: %v -- targetTime: %v -- now: %v", msgID, targetTime, now)
+					if targetTime.Before(now) {
+						log.Debugf("Adding MsgId: %v", msgID)
+						msgIds = append(msgIds, msgID)
+						delete(t.negativeAcks, msgID)
+					}
+				}
+
+				t.Unlock()
+
+				if len(msgIds) > 0 {
+					t.rc.Redeliver(msgIds)
+				}
+			}
+
+		}
+	}
+}
+
+func (t *negativeAcksTracker) Close() {
+	t.doneCh <- nil
+}
diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go
new file mode 100644
index 0000000..733114a
--- /dev/null
+++ b/pulsar/negative_acks_tracker_test.go
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"github.com/stretchr/testify/assert"
+	"sort"
+	"sync"
+	"testing"
+	"time"
+)
+
+type nackMockedConsumer struct {
+	sync.Mutex
+	cond   *sync.Cond
+	msgIds []messageID
+}
+
+func (nmc *nackMockedConsumer) Redeliver(msgIds []messageID) {
+	nmc.Lock()
+	if nmc.msgIds == nil {
+		nmc.msgIds = msgIds
+		sort.Slice(msgIds, func(i, j int) bool {
+			return msgIds[i].ledgerID < msgIds[j].entryID
+		})
+		nmc.cond.Signal()
+	}
+
+	nmc.Unlock()
+}
+
+func (nmc *nackMockedConsumer) Wait() []messageID {
+	nmc.Lock()
+	defer nmc.Unlock()
+	nmc.cond.Wait()
+
+	return nmc.msgIds
+}
+
+func TestNacksTracker(t *testing.T) {
+	nmc := &nackMockedConsumer{}
+	nmc.cond = sync.NewCond(nmc)
+	nacks := newNegativeAcksTracker(nmc, 1*time.Second)
+
+	nacks.Add(&messageID{
+		ledgerID: 1,
+		entryID:  1,
+		batchIdx: 1,
+	})
+
+	nacks.Add(&messageID{
+		ledgerID: 2,
+		entryID:  2,
+		batchIdx: 1,
+	})
+
+	msgIds := nmc.Wait()
+
+	assert.Equal(t, 2, len(msgIds))
+	assert.Equal(t, int64(1), msgIds[0].ledgerID)
+	assert.Equal(t, int64(1), msgIds[0].entryID)
+	assert.Equal(t, int64(2), msgIds[1].ledgerID)
+	assert.Equal(t, int64(2), msgIds[1].entryID)
+
+	nacks.Close()
+}
+
+func TestNacksWithBatchesTracker(t *testing.T) {
+	nmc := &nackMockedConsumer{}
+	nmc.cond = sync.NewCond(nmc)
+	nacks := newNegativeAcksTracker(nmc, 1*time.Second)
+
+	nacks.Add(&messageID{
+		ledgerID: 1,
+		entryID:  1,
+		batchIdx: 1,
+	})
+
+	nacks.Add(&messageID{
+		ledgerID: 1,
+		entryID:  1,
+		batchIdx: 2,
+	})
+
+	nacks.Add(&messageID{
+		ledgerID: 1,
+		entryID:  1,
+		batchIdx: 3,
+	})
+
+	nacks.Add(&messageID{
+		ledgerID: 2,
+		entryID:  2,
+		batchIdx: 1,
+	})
+
+	msgIds := nmc.Wait()
+
+	assert.Equal(t, 2, len(msgIds))
+	assert.Equal(t, int64(1), msgIds[0].ledgerID)
+	assert.Equal(t, int64(1), msgIds[0].entryID)
+	assert.Equal(t, int64(2), msgIds[1].ledgerID)
+	assert.Equal(t, int64(2), msgIds[1].entryID)
+
+	nacks.Close()
+}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 052e92a..1961172 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -434,8 +434,7 @@
 		fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
 			msg.ID(), string(msg.Payload()))
 		assert.Nil(t, err)
-		err = consumer.Ack(msg)
-		assert.Nil(t, err)
+		consumer.Ack(msg)
 		msgCount++
 	}
 	assert.Equal(t, msgCount, numOfMessages/2)