blob: a7dc88e3f48a7ff5505d7c781d0c3163958f2121 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"sync"
"time"
log "github.com/sirupsen/logrus"
)
type redeliveryConsumer interface {
Redeliver(msgIds []messageID)
}
type negativeAcksTracker struct {
sync.Mutex
doneCh chan interface{}
negativeAcks map[messageID]time.Time
rc redeliveryConsumer
tick *time.Ticker
delay time.Duration
}
func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration) *negativeAcksTracker {
t := &negativeAcksTracker{
doneCh: make(chan interface{}),
negativeAcks: make(map[messageID]time.Time),
rc: rc,
tick: time.NewTicker(delay / 3),
delay: delay,
}
go t.track()
return t
}
func (t *negativeAcksTracker) Add(msgID messageID) {
// Always clear up the batch index since we want to track the nack
// for the entire batch
batchMsgID := messageID{
ledgerID: msgID.ledgerID,
entryID: msgID.entryID,
batchIdx: 0,
}
t.Lock()
defer t.Unlock()
_, present := t.negativeAcks[batchMsgID]
if present {
// The batch is already being tracked
return
}
targetTime := time.Now().Add(t.delay)
t.negativeAcks[batchMsgID] = targetTime
}
func (t *negativeAcksTracker) track() {
for {
select {
case <-t.doneCh:
log.Debug("Closing nack tracker")
return
case <-t.tick.C:
{
t.Lock()
now := time.Now()
msgIds := make([]messageID, 0)
for msgID, targetTime := range t.negativeAcks {
log.Debugf("MsgId: %v -- targetTime: %v -- now: %v", msgID, targetTime, now)
if targetTime.Before(now) {
log.Debugf("Adding MsgId: %v", msgID)
msgIds = append(msgIds, msgID)
delete(t.negativeAcks, msgID)
}
}
t.Unlock()
if len(msgIds) > 0 {
t.rc.Redeliver(msgIds)
}
}
}
}
}
func (t *negativeAcksTracker) Close() {
t.doneCh <- nil
}