// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

import (
	log "github.com/sirupsen/logrus"
	"sync"
	"time"
)

type redeliveryConsumer interface {
	Redeliver(msgIds []messageID)
}

type negativeAcksTracker struct {
	sync.Mutex

	doneCh       chan interface{}
	negativeAcks map[messageID]time.Time
	rc           redeliveryConsumer
	tick         *time.Ticker
	delay        time.Duration
}

func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration) *negativeAcksTracker {
	t := &negativeAcksTracker{
		doneCh:       make(chan interface{}),
		negativeAcks: make(map[messageID]time.Time),
		rc:           rc,
		tick:         time.NewTicker(delay / 3),
		delay:        delay,
	}

	go t.track()
	return t
}

func (t *negativeAcksTracker) Add(msgID *messageID) {
	// Always clear up the batch index since we want to track the nack
	// for the entire batch
	batchMsgId := messageID{
		ledgerID: msgID.ledgerID,
		entryID:  msgID.entryID,
		batchIdx: 0,
	}

	t.Lock()
	defer t.Unlock()

	_, present := t.negativeAcks[batchMsgId]
	if present {
		// The batch is already being tracked
		return
	} else {
		targetTime := time.Now().Add(t.delay)
		t.negativeAcks[batchMsgId] = targetTime
	}
}

func (t *negativeAcksTracker) track() {
	for {
		select {
		case <-t.doneCh:
			log.Debug("Closing nack tracker")
			return

		case <-t.tick.C:
			{
				t.Lock()

				now := time.Now()
				msgIds := make([]messageID, 0)
				for msgID, targetTime := range t.negativeAcks {
					log.Debugf("MsgId: %v -- targetTime: %v -- now: %v", msgID, targetTime, now)
					if targetTime.Before(now) {
						log.Debugf("Adding MsgId: %v", msgID)
						msgIds = append(msgIds, msgID)
						delete(t.negativeAcks, msgID)
					}
				}

				t.Unlock()

				if len(msgIds) > 0 {
					t.rc.Redeliver(msgIds)
				}
			}

		}
	}
}

func (t *negativeAcksTracker) Close() {
	t.doneCh <- nil
}
