// MIT License
//
// Copyright (c) 2018 Aman Mangal
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

// +build !windows

package mmap

import (
	"github.com/apache/skywalking-satellite/plugins/queue/api"
)

// Because the design of the mmap-queue in Satellite references the design of the
// bigqueue(https://github.com/grandecola/bigqueue), the queue operation file retains
// the original author license.
//
// The reason why we references the source codes of bigqueue rather than using the lib
// is the file queue in Satellite is like following.
// 1. Only one consumer and publisher in the Satellite queue.
// 2. Reusing files strategy is required to reduce the creation times in the Satellite queue.
// 3. More complex OFFSET design is needed to ensure the final stability of data.

const uInt64Size = 8

// enqueue writes the data into the file system. It first writes the length of the data,
// then the data itself. It means the whole data may not exist in the one segments.
func (q *Queue) enqueue(bytes []byte) error {
	q.lock.Lock()
	defer q.lock.Unlock()
	if q.isFull() {
		return api.ErrFull
	}
	id, offset := q.meta.GetWritingOffset()
	id, offset, err := q.writeLength(len(bytes), id, offset)
	if err != nil {
		return err
	}
	id, offset, err = q.writeBytes(bytes, id, offset)
	if err != nil {
		return err
	}
	q.meta.PutWritingOffset(id, offset)
	q.unflushedNum++
	if q.unflushedNum == q.FlushCeilingNum {
		q.flushChannel <- struct{}{}
		q.unflushedNum = 0
	}
	return nil
}

// dequeue reads the data from the file system. It first reads the length of the data,
// then the data itself. It means the whole data may not exist in the one segments.
func (q *Queue) dequeue() (data []byte, rid, roffset int64, err error) {
	q.lock.Lock()
	defer q.lock.Unlock()
	if q.isEmpty() {
		return nil, 0, 0, api.ErrEmpty
	}
	preID, preOffset := q.meta.GetReadingOffset()
	id, offset, length, err := q.readLength(preID, preOffset)
	if err != nil {
		return nil, 0, 0, err
	}
	bytes, id, offset, err := q.readBytes(id, offset, length)
	if err != nil {
		return nil, 0, 0, err
	}
	q.meta.PutReadingOffset(id, offset)
	if id != preID {
		q.markReadChannel <- preID
	}
	return bytes, id, offset, nil
}

// readBytes reads bytes into the memory mapped file.
func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, newOffset int64, err error) {
	counter := 0
	res := make([]byte, length)
	for {
		segment, err := q.GetSegment(id)
		if err != nil {
			return nil, 0, 0, err
		}
		readBytes, err := segment.ReadAt(res[counter:], offset)
		if err != nil {
			return nil, 0, 0, err
		}
		counter += readBytes
		offset += int64(readBytes)
		if offset == int64(q.SegmentSize) {
			id, offset = id+1, 0
		}
		if counter == length {
			break
		}
	}
	return res, id, offset, nil
}

// readLength reads the data length with 8 Bits spaces.
func (q *Queue) readLength(id, offset int64) (newID, newOffset int64, length int, err error) {
	if offset+uInt64Size > int64(q.SegmentSize) {
		id, offset = id+1, 0
	}
	segment, err := q.GetSegment(id)
	if err != nil {
		return 0, 0, 0, err
	}
	num := segment.ReadUint64At(offset)
	offset += uInt64Size
	if offset == int64(q.SegmentSize) {
		id, offset = id+1, 0
	}
	return id, offset, int(num), nil
}

// writeLength write the data length with 8 Bits spaces.
func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset int64, err error) {
	if offset+uInt64Size > int64(q.SegmentSize) {
		id, offset = id+1, 0
	}
	segment, err := q.GetSegment(id)
	if err != nil {
		return 0, 0, err
	}
	segment.WriteUint64At(uint64(length), offset)
	offset += uInt64Size
	if offset == int64(q.SegmentSize) {
		id, offset = id+1, 0
	}
	return id, offset, nil
}

// writeBytes writes bytes into the memory mapped file.
func (q *Queue) writeBytes(bytes []byte, id, offset int64) (newID, newOffset int64, err error) {
	counter := 0
	length := len(bytes)

	for {
		segment, err := q.GetSegment(id)
		if err != nil {
			return 0, 0, err
		}
		writtenBytes, err := segment.WriteAt(bytes[counter:], offset)
		if err != nil {
			return 0, 0, err
		}
		counter += writtenBytes
		offset += int64(writtenBytes)
		if offset == int64(q.SegmentSize) {
			id, offset = id+1, 0
		}
		if counter == length {
			break
		}
	}
	return id, offset, nil
}
