// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build !windows
// +build !windows

package mmap

import (
	"context"
	"fmt"
	"os"
	"strconv"
	"strings"
	"sync"
	"syscall"
	"time"

	"github.com/sirupsen/logrus"

	"github.com/grandecola/mmap"

	"google.golang.org/protobuf/proto"

	"github.com/apache/skywalking-satellite/internal/pkg/config"
	"github.com/apache/skywalking-satellite/internal/pkg/log"
	"github.com/apache/skywalking-satellite/internal/satellite/event"
	"github.com/apache/skywalking-satellite/plugins/queue/api"
	"github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"

	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
)

const (
	data4KB         = 131072
	minimumSegments = 4
	Name            = "mmap-queue"
)

// Queue is a memory mapped queue to store the input data.
type Queue struct {
	config.CommonFields
	SegmentSize           int   `mapstructure:"segment_size"`            // The size of each segment. The unit is byte.
	MaxInMemSegments      int32 `mapstructure:"max_in_mem_segments"`     // The max num of segments in memory.
	QueueCapacitySegments int   `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
	FlushPeriod           int   `mapstructure:"flush_period"`            // The period flush time. The unit is ms.
	FlushCeilingNum       int   `mapstructure:"flush_ceiling_num"`       // The max number in one flush time.
	MaxEventSize          int   `mapstructure:"max_event_size"`          // The max size of the input event.

	// running components
	queueName              string         // The queue name.
	meta                   *meta.Metadata // The metadata file.
	segments               []*mmap.File   // The data files.
	mmapCount              int32          // The number of the memory mapped files.
	unflushedNum           int            // The unflushed number.
	flushChannel           chan struct{}  // The flushChannel channel would receive a signal when the unflushedNum reach the flush_ceiling_num.
	insufficientMemChannel chan struct{}  // Notify when memory is insufficient
	sufficientMemChannel   chan struct{}  // Notify when memory is sufficient
	markReadChannel        chan int64     // Transfer the read segmentID to do ummap operation.
	ready                  bool           // The status of the queue.
	locker                 []int32        // locker

	// control components
	ctx        context.Context    // Parent ctx
	cancel     context.CancelFunc // Parent ctx cancel function
	showDownWg sync.WaitGroup     // The shutdown wait group.
}

func (q *Queue) Name() string {
	return Name
}

func (q *Queue) Description() string {
	return "This is a memory mapped queue to provide the persistent storage for the input event." +
		" Please note that this plugin does not support Windows platform."
}

func (q *Queue) DefaultConfig() string {
	return `
# The size of each segment. Default value is 256K. The unit is Byte.
segment_size: 262114
# The max num of segments in memory. Default value is 10.
max_in_mem_segments: 10
# The capacity of Queue = segment_size * queue_capacity_segments.
queue_capacity_segments: 2000
# The period flush time. The unit is ms. Default value is 1 second.
flush_period: 1000
# The max number in one flush time.  Default value is 10000.
flush_ceiling_num: 10000
# The max size of the input event. Default value is 20k.
max_event_size: 20480
`
}

func (q *Queue) Initialize() error {
	// the size of each segment file should be a multiple of the page size.
	pageSize := os.Getpagesize()
	if q.SegmentSize%pageSize != 0 {
		q.SegmentSize -= q.SegmentSize % pageSize
	}
	if q.SegmentSize/pageSize == 0 {
		q.SegmentSize = data4KB
	}
	// the minimum MaxInMemSegments value should be 4.
	if q.MaxInMemSegments < minimumSegments {
		q.MaxInMemSegments = minimumSegments
	}
	q.queueName = strings.Join([]string{q.Name(), q.PipeName}, "_")
	// load metadata and override the reading or writing offset by the committed or watermark offset.
	md, err := meta.NewMetaData(q.queueName, q.QueueCapacitySegments)
	if err != nil {
		return fmt.Errorf("error in creating the metadata: %v", err)
	}
	q.meta = md
	cmID, cmOffset := md.GetCommittedOffset()
	wmID, wmOffset := md.GetWatermarkOffset()
	md.PutWritingOffset(wmID, wmOffset)
	md.PutReadingOffset(cmID, cmOffset)
	// keep the reading or writing segments in the memory.
	q.segments = make([]*mmap.File, q.QueueCapacitySegments)
	if _, err := q.GetSegment(cmID); err != nil {
		return err
	}
	if _, err := q.GetSegment(wmID); err != nil {
		return err
	}
	// init components
	q.insufficientMemChannel = make(chan struct{})
	q.sufficientMemChannel = make(chan struct{})
	q.markReadChannel = make(chan int64, 1)
	q.flushChannel = make(chan struct{})
	q.ctx, q.cancel = context.WithCancel(context.Background())
	// async supported processes.
	q.showDownWg.Add(2)
	q.locker = make([]int32, q.QueueCapacitySegments)
	go q.segmentSwapper()
	go q.flush()
	q.ready = true
	return nil
}

func (q *Queue) Enqueue(e *v1.SniffData) error {
	if !q.ready {
		log.Logger.WithField("pipe", q.CommonFields.PipeName).Warnf("the enqueue operation would be ignored because the queue was closed.")
		return api.ErrClosed
	}
	data, err := proto.Marshal(e)
	if err != nil {
		return err
	}
	if len(data) > q.MaxEventSize {
		return fmt.Errorf("cannot enqueue the event to the queue because the size %dB is over ceiling", len(data))
	}
	return q.enqueue(data)
}

func (q *Queue) Dequeue() (*api.SequenceEvent, error) {
	if !q.ready {
		log.Logger.WithField("pipe", q.CommonFields.PipeName).Warnf("the dequeue operation would be ignored because the queue was closed.")
		return nil, api.ErrClosed
	}
	data, id, offset, err := q.dequeue()
	if err != nil {
		return nil, err
	}
	e := &v1.SniffData{}
	err = proto.Unmarshal(data, e)
	if err != nil {
		return nil, err
	}
	return &api.SequenceEvent{
		Event:  e,
		Offset: q.encodeOffset(id, offset),
	}, nil
}

func (q *Queue) Close() error {
	q.ready = false
	q.cancel()
	q.showDownWg.Wait()
	for i, segment := range q.segments {
		if segment != nil {
			if err := segment.Flush(syscall.MS_SYNC); err != nil {
				log.Logger.Errorf("cannot unmap the segments: %d, %v", i, err)
			}
			if err := segment.Unmap(); err != nil {
				log.Logger.Errorf("cannot unmap the segments: %d, %v", i, err)
			}
		}
	}
	if err := q.meta.Close(); err != nil {
		log.Logger.Errorf("cannot unmap the metadata: %v", err)
	}
	return nil
}

func (q *Queue) Ack(lastOffset event.Offset) {
	if !q.ready {
		log.Logger.WithFields(logrus.Fields{
			"pipe":   q.CommonFields.PipeName,
			"offset": lastOffset,
		}).Warnf("the ack operation would be ignored because the queue was closed.")
		return
	}
	id, offset, err := q.decodeOffset(lastOffset)
	if err != nil {
		log.Logger.Errorf("cannot ack queue with the offset:%s", lastOffset)
	}
	q.meta.PutCommittedOffset(id, offset)
}

// flush control the flush operation by timer or counter.
func (q *Queue) flush() {
	defer q.showDownWg.Done()
	ctx, _ := context.WithCancel(q.ctx) // nolint
	for {
		timer := time.NewTimer(time.Duration(q.FlushPeriod) * time.Millisecond)
		select {
		case <-q.flushChannel:
			q.doFlush()
			timer.Reset(time.Duration(q.FlushPeriod) * time.Millisecond)
		case <-timer.C:
			q.doFlush()
		case <-ctx.Done():
			q.doFlush()
			return
		}
	}
}

// doFlush flush the segment and meta files to the disk.
func (q *Queue) doFlush() {
	for i := range q.segments {
		q.lockByIndex(i)
		if q.segments[i] == nil {
			q.unlockByIndex(i)
			continue
		}
		if err := q.segments[i].Flush(syscall.MS_SYNC); err != nil {
			log.Logger.Errorf("cannot flush segment file: %v", err)
		}
		q.unlockByIndex(i)
	}
	wid, woffset := q.meta.GetWritingOffset()
	q.meta.PutWatermarkOffset(wid, woffset)
	if err := q.meta.Flush(); err != nil {
		log.Logger.Errorf("cannot flush meta file: %v", err)
	}
}

// isEmpty returns the capacity status
func (q *Queue) isEmpty() bool {
	rid, roffset := q.meta.GetReadingOffset()
	wid, woffset := q.meta.GetWritingOffset()
	return rid == wid && roffset == woffset
}

// isEmpty returns the capacity status
func (q *Queue) isFull() bool {
	rid, _ := q.meta.GetReadingOffset()
	wid, _ := q.meta.GetWritingOffset()
	// ensure enough spaces to promise data stability.
	maxWid := rid + int64(q.QueueCapacitySegments) - 1 - int64(q.MaxEventSize/q.SegmentSize)
	return wid >= maxWid
}

// encode the meta to the offset
func (q *Queue) encodeOffset(id, offset int64) event.Offset {
	return event.Offset(strconv.FormatInt(id, 10) + "-" + strconv.FormatInt(offset, 10))
}

// decode the offset to the meta of the mmap queue.
func (q *Queue) decodeOffset(val event.Offset) (id, offset int64, err error) {
	arr := strings.Split(string(val), "-")
	if len(arr) == 2 {
		id, err := strconv.ParseInt(arr[0], 10, 64)
		if err != nil {
			return 0, 0, err
		}
		offset, err := strconv.ParseInt(arr[1], 10, 64)
		if err != nil {
			return 0, 0, err
		}
		return id, offset, nil
	}
	return 0, 0, fmt.Errorf("the input offset string is illegal: %s", val)
}
