blob: d7b0c2ef6457e6ed2d503801eb069e61836dc35f [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//go:build !windows
// +build !windows
package mmap
import (
"context"
"fmt"
"os"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/sirupsen/logrus"
"github.com/grandecola/mmap"
"google.golang.org/protobuf/proto"
"github.com/apache/skywalking-satellite/internal/pkg/config"
"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/satellite/event"
"github.com/apache/skywalking-satellite/plugins/queue/api"
"github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
)
const (
data4KB = 131072
minimumSegments = 4
Name = "mmap-queue"
)
// Queue is a memory mapped queue to store the input data.
type Queue struct {
config.CommonFields
SegmentSize int `mapstructure:"segment_size"` // The size of each segment. The unit is byte.
MaxInMemSegments int32 `mapstructure:"max_in_mem_segments"` // The max num of segments in memory.
QueueCapacitySegments int `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
FlushPeriod int `mapstructure:"flush_period"` // The period flush time. The unit is ms.
FlushCeilingNum int `mapstructure:"flush_ceiling_num"` // The max number in one flush time.
MaxEventSize int `mapstructure:"max_event_size"` // The max size of the input event.
// running components
queueName string // The queue name.
meta *meta.Metadata // The metadata file.
segments []*mmap.File // The data files.
mmapCount int32 // The number of the memory mapped files.
unflushedNum int // The unflushed number.
flushChannel chan struct{} // The flushChannel channel would receive a signal when the unflushedNum reach the flush_ceiling_num.
insufficientMemChannel chan struct{} // Notify when memory is insufficient
sufficientMemChannel chan struct{} // Notify when memory is sufficient
markReadChannel chan int64 // Transfer the read segmentID to do ummap operation.
ready bool // The status of the queue.
locker []int32 // locker
// control components
ctx context.Context // Parent ctx
cancel context.CancelFunc // Parent ctx cancel function
showDownWg sync.WaitGroup // The shutdown wait group.
}
func (q *Queue) Name() string {
return Name
}
func (q *Queue) Description() string {
return "This is a memory mapped queue to provide the persistent storage for the input event." +
" Please note that this plugin does not support Windows platform."
}
func (q *Queue) DefaultConfig() string {
return `
# The size of each segment. Default value is 256K. The unit is Byte.
segment_size: 262114
# The max num of segments in memory. Default value is 10.
max_in_mem_segments: 10
# The capacity of Queue = segment_size * queue_capacity_segments.
queue_capacity_segments: 2000
# The period flush time. The unit is ms. Default value is 1 second.
flush_period: 1000
# The max number in one flush time. Default value is 10000.
flush_ceiling_num: 10000
# The max size of the input event. Default value is 20k.
max_event_size: 20480
`
}
func (q *Queue) Initialize() error {
// the size of each segment file should be a multiple of the page size.
pageSize := os.Getpagesize()
if q.SegmentSize%pageSize != 0 {
q.SegmentSize -= q.SegmentSize % pageSize
}
if q.SegmentSize/pageSize == 0 {
q.SegmentSize = data4KB
}
// the minimum MaxInMemSegments value should be 4.
if q.MaxInMemSegments < minimumSegments {
q.MaxInMemSegments = minimumSegments
}
q.queueName = strings.Join([]string{q.Name(), q.PipeName}, "_")
// load metadata and override the reading or writing offset by the committed or watermark offset.
md, err := meta.NewMetaData(q.queueName, q.QueueCapacitySegments)
if err != nil {
return fmt.Errorf("error in creating the metadata: %v", err)
}
q.meta = md
cmID, cmOffset := md.GetCommittedOffset()
wmID, wmOffset := md.GetWatermarkOffset()
md.PutWritingOffset(wmID, wmOffset)
md.PutReadingOffset(cmID, cmOffset)
// keep the reading or writing segments in the memory.
q.segments = make([]*mmap.File, q.QueueCapacitySegments)
if _, err := q.GetSegment(cmID); err != nil {
return err
}
if _, err := q.GetSegment(wmID); err != nil {
return err
}
// init components
q.insufficientMemChannel = make(chan struct{})
q.sufficientMemChannel = make(chan struct{})
q.markReadChannel = make(chan int64, 1)
q.flushChannel = make(chan struct{})
q.ctx, q.cancel = context.WithCancel(context.Background())
// async supported processes.
q.showDownWg.Add(2)
q.locker = make([]int32, q.QueueCapacitySegments)
go q.segmentSwapper()
go q.flush()
q.ready = true
return nil
}
func (q *Queue) Enqueue(e *v1.SniffData) error {
if !q.ready {
log.Logger.WithField("pipe", q.CommonFields.PipeName).Warnf("the enqueue operation would be ignored because the queue was closed.")
return api.ErrClosed
}
data, err := proto.Marshal(e)
if err != nil {
return err
}
if len(data) > q.MaxEventSize {
return fmt.Errorf("cannot enqueue the event to the queue because the size %dB is over ceiling", len(data))
}
return q.enqueue(data)
}
func (q *Queue) Dequeue() (*api.SequenceEvent, error) {
if !q.ready {
log.Logger.WithField("pipe", q.CommonFields.PipeName).Warnf("the dequeue operation would be ignored because the queue was closed.")
return nil, api.ErrClosed
}
data, id, offset, err := q.dequeue()
if err != nil {
return nil, err
}
e := &v1.SniffData{}
err = proto.Unmarshal(data, e)
if err != nil {
return nil, err
}
return &api.SequenceEvent{
Event: e,
Offset: q.encodeOffset(id, offset),
}, nil
}
func (q *Queue) Close() error {
q.ready = false
q.cancel()
q.showDownWg.Wait()
for i, segment := range q.segments {
if segment != nil {
if err := segment.Flush(syscall.MS_SYNC); err != nil {
log.Logger.Errorf("cannot unmap the segments: %d, %v", i, err)
}
if err := segment.Unmap(); err != nil {
log.Logger.Errorf("cannot unmap the segments: %d, %v", i, err)
}
}
}
if err := q.meta.Close(); err != nil {
log.Logger.Errorf("cannot unmap the metadata: %v", err)
}
return nil
}
func (q *Queue) Ack(lastOffset event.Offset) {
if !q.ready {
log.Logger.WithFields(logrus.Fields{
"pipe": q.CommonFields.PipeName,
"offset": lastOffset,
}).Warnf("the ack operation would be ignored because the queue was closed.")
return
}
id, offset, err := q.decodeOffset(lastOffset)
if err != nil {
log.Logger.Errorf("cannot ack queue with the offset:%s", lastOffset)
}
q.meta.PutCommittedOffset(id, offset)
}
// flush control the flush operation by timer or counter.
func (q *Queue) flush() {
defer q.showDownWg.Done()
ctx, _ := context.WithCancel(q.ctx) // nolint
for {
timer := time.NewTimer(time.Duration(q.FlushPeriod) * time.Millisecond)
select {
case <-q.flushChannel:
q.doFlush()
timer.Reset(time.Duration(q.FlushPeriod) * time.Millisecond)
case <-timer.C:
q.doFlush()
case <-ctx.Done():
q.doFlush()
return
}
}
}
// doFlush flush the segment and meta files to the disk.
func (q *Queue) doFlush() {
for i := range q.segments {
q.lockByIndex(i)
if q.segments[i] == nil {
q.unlockByIndex(i)
continue
}
if err := q.segments[i].Flush(syscall.MS_SYNC); err != nil {
log.Logger.Errorf("cannot flush segment file: %v", err)
}
q.unlockByIndex(i)
}
wid, woffset := q.meta.GetWritingOffset()
q.meta.PutWatermarkOffset(wid, woffset)
if err := q.meta.Flush(); err != nil {
log.Logger.Errorf("cannot flush meta file: %v", err)
}
}
// isEmpty returns the capacity status
func (q *Queue) isEmpty() bool {
rid, roffset := q.meta.GetReadingOffset()
wid, woffset := q.meta.GetWritingOffset()
return rid == wid && roffset == woffset
}
// isEmpty returns the capacity status
func (q *Queue) isFull() bool {
rid, _ := q.meta.GetReadingOffset()
wid, _ := q.meta.GetWritingOffset()
// ensure enough spaces to promise data stability.
maxWid := rid + int64(q.QueueCapacitySegments) - 1 - int64(q.MaxEventSize/q.SegmentSize)
return wid >= maxWid
}
// encode the meta to the offset
func (q *Queue) encodeOffset(id, offset int64) event.Offset {
return event.Offset(strconv.FormatInt(id, 10) + "-" + strconv.FormatInt(offset, 10))
}
// decode the offset to the meta of the mmap queue.
func (q *Queue) decodeOffset(val event.Offset) (id, offset int64, err error) {
arr := strings.Split(string(val), "-")
if len(arr) == 2 {
id, err := strconv.ParseInt(arr[0], 10, 64)
if err != nil {
return 0, 0, err
}
offset, err := strconv.ParseInt(arr[1], 10, 64)
if err != nil {
return 0, 0, err
}
return id, offset, nil
}
return 0, 0, fmt.Errorf("the input offset string is illegal: %s", val)
}