blob: 7c60b988fdd8cc1cdbb7fcdfdaf4bd2b66d18cf8 [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package buffer
import (
"container/list"
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events"
"github.com/apache/skywalking-rover/pkg/tools/host"
)
var (
ErrNotComplete = errors.New("socket: not complete event")
)
type Buffer struct {
dataEvents *list.List
detailEvents *list.List
validated bool // the events list is validated or not
eventLocker sync.RWMutex
head *Position
current *Position
// record the latest expired data id in connection for expire the older socket detail
// because the older socket detail may not be received in buffer
latestExpiredDataID uint64
}
type Position struct {
// element of the event list
element *list.Element
// bufIndex the buffer index of the element
bufIndex int
}
func (p *Position) String() string {
buffer := p.element.Value.(events.SocketDataBuffer)
return fmt.Sprintf("data id: %d, sequence: %d, buffer index: %d",
buffer.DataID(), buffer.DataSequence(), p.bufIndex)
}
func NewBuffer() *Buffer {
return &Buffer{
dataEvents: list.New(),
detailEvents: list.New(),
validated: false,
}
}
func (r *Buffer) FindFirstDataBuffer(dataID uint64) events.SocketDataBuffer {
for e := r.dataEvents.Front(); e != nil; e = e.Next() {
cur := e.Value.(events.SocketDataBuffer)
if cur.DataID() == dataID {
return cur
}
}
return nil
}
func (r *Buffer) Position() *Position {
return r.current.Clone()
}
func (r *Buffer) Slice(validated bool, start, end *Position) *Buffer {
dataEvents := list.New()
detailEvents := list.New()
var firstDetailElement *list.Element
for nextElement := start.element; nextElement != end.element; nextElement = nextElement.Next() {
// found first matches detail event
if detailEvents.Len() == 0 || firstDetailElement == nil {
for e := r.detailEvents.Front(); e != nil; e = e.Next() {
if e.Value.(*events.SocketDetailEvent).DataID >= nextElement.Value.(events.SocketDataBuffer).DataID() {
detailEvents.PushBack(e.Value)
firstDetailElement = e
break
}
}
}
dataEvents.PushBack(nextElement.Value)
}
lastBuffer := end.element.Value.(events.SocketDataBuffer)
dataEvents.PushBack(&events.SocketDataEventLimited{SocketDataBuffer: lastBuffer, Size: end.bufIndex})
// if the first detail element been found, append the details until the last buffer data id
if firstDetailElement == nil {
for e := r.detailEvents.Front(); e != nil; e = e.Next() {
if e.Value.(*events.SocketDetailEvent).DataID == lastBuffer.DataID() {
detailEvents.PushBack(e.Value)
break
}
}
} else if firstDetailElement != nil && firstDetailElement.Value.(*events.SocketDetailEvent).DataID != lastBuffer.DataID() {
for tmp := firstDetailElement.Next(); tmp != nil; tmp = tmp.Next() {
if tmp.Value.(*events.SocketDetailEvent).DataID > lastBuffer.DataID() {
break
}
detailEvents.PushBack(tmp.Value)
}
}
return &Buffer{
dataEvents: dataEvents,
detailEvents: detailEvents,
validated: validated,
head: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex},
current: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex},
}
}
func (r *Buffer) Len() int {
if r == nil || r.head == nil {
return 0
}
var result int
var startIndex = r.head.bufIndex
for e := r.head.element; e != nil; e = e.Next() {
result += r.head.element.Value.(events.SocketDataBuffer).BufferLen() - startIndex
startIndex = 0
}
return result
}
func (r *Buffer) Details() *list.List {
return r.detailEvents
}
func (r *Buffer) FirstSocketBuffer() events.SocketDataBuffer {
if r.dataEvents.Len() == 0 {
return nil
}
return r.dataEvents.Front().Value.(events.SocketDataBuffer)
}
func (r *Buffer) LastSocketBuffer() events.SocketDataBuffer {
if r.dataEvents.Len() == 0 {
return nil
}
return r.dataEvents.Back().Value.(events.SocketDataBuffer)
}
// DetectNotSendingLastPosition detect the buffer contains not sending data: the BPF limited socket data count
func (r *Buffer) DetectNotSendingLastPosition() *Position {
if r.dataEvents.Len() == 0 {
return nil
}
for e := r.dataEvents.Front(); e != nil; e = e.Next() {
buf := e.Value.(events.SocketDataBuffer)
// the buffer is sent finished but still have reduced data not send
if buf.IsFinished() && buf.HaveReduceDataAfterChunk() {
return &Position{element: e, bufIndex: buf.BufferLen()}
}
}
return nil
}
func CombineSlices(validated bool, buffers ...*Buffer) *Buffer {
if len(buffers) == 0 {
return nil
}
if len(buffers) == 1 {
return buffers[0]
}
dataEvents := list.New()
detailEvents := list.New()
for _, b := range buffers {
if b.head.bufIndex > 0 {
headBuffer := b.dataEvents.Front().Value.(events.SocketDataBuffer)
dataEvents.PushBack(&events.SocketDataEventLimited{SocketDataBuffer: headBuffer,
From: b.head.bufIndex, Size: headBuffer.BufferLen()})
for next := b.dataEvents.Front().Next(); next != nil; next = next.Next() {
dataEvents.PushBack(next.Value)
}
} else {
dataEvents.PushBackList(b.dataEvents)
}
detailEvents.PushBackList(b.detailEvents)
}
return &Buffer{
dataEvents: dataEvents,
detailEvents: detailEvents,
validated: validated,
head: &Position{element: dataEvents.Front(), bufIndex: 0},
current: &Position{element: dataEvents.Front(), bufIndex: 0},
}
}
func (r *Buffer) Peek(p []byte) (n int, err error) {
// save the index temporary
tmpPosition := r.current.Clone()
// restore the index
defer func() {
r.current = tmpPosition
}()
readIndex := 0
for readIndex < len(p) {
count, err := r.Read(p[readIndex:])
if err != nil {
return 0, err
}
readIndex += count
}
return readIndex, nil
}
func (r *Buffer) OffsetPosition(offset int) *Position {
var nextElement func(e *list.Element) *list.Element
if offset == 0 {
return r.current.Clone()
} else if offset > 0 {
nextElement = func(e *list.Element) *list.Element {
return e.Next()
}
} else {
nextElement = func(e *list.Element) *list.Element {
return e.Prev()
}
}
var curEle = r.current.element
var curIndex = r.current.bufIndex
for ; curEle != nil; curEle = nextElement(curEle) {
nextOffset := curIndex + offset
bufferLen := curEle.Value.(events.SocketDataBuffer).BufferLen()
if nextOffset >= 0 && nextOffset < bufferLen {
curIndex += offset
break
}
if offset > 0 {
offset -= bufferLen - curIndex
curIndex = 0
} else {
offset += curIndex
next := nextElement(curEle)
if next == nil {
curEle = next
break
}
curIndex = curEle.Value.(events.SocketDataBuffer).BufferLen()
}
}
if curEle == nil {
return nil
}
return &Position{element: curEle, bufIndex: curIndex}
}
func (r *Buffer) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
if r.current == nil || r.current.element == nil {
return 0, io.EOF
}
element, n := r.ReadFromCurrent(p)
if n > 0 {
return n, nil
}
curEvent := element.Value.(events.SocketDataBuffer)
next := r.nextElement(element)
if next == nil {
return 0, io.EOF
}
nextEvent := next.Value.(events.SocketDataBuffer)
var shouldRead = false
if r.validated {
shouldRead = true
// same data id and sequence orders
} else if (curEvent.DataID() == nextEvent.DataID() && curEvent.DataSequence()+1 == nextEvent.DataSequence()) ||
// cur event is finished and next event is start
(nextEvent.IsStart() && curEvent.IsFinished()) ||
// same data id and sequence but have difference buffer index
(curEvent.DataID() == nextEvent.DataID() && curEvent.DataSequence() == nextEvent.DataSequence() &&
r.current.bufIndex <= nextEvent.BufferStartPosition()) {
shouldRead = true
}
if !shouldRead {
return 0, ErrNotComplete
}
return r.read0(next, nextEvent, p)
}
func (r *Buffer) ReadFromCurrent(p []byte) (element *list.Element, n int) {
element = r.current.element
curEvent := element.Value.(events.SocketDataBuffer)
residueSize := curEvent.BufferLen() - r.current.bufIndex
if residueSize > 0 {
readLen := len(p)
if residueSize < readLen {
readLen = residueSize
}
n = copy(p, curEvent.BufferData()[r.current.bufIndex:r.current.bufIndex+readLen])
r.current.bufIndex += n
return element, n
}
return element, 0
}
func (r *Buffer) read0(currentElement *list.Element, currentBuffer events.SocketDataBuffer, p []byte) (n int, err error) {
readLen := len(p)
if currentBuffer.BufferLen() < readLen {
readLen = currentBuffer.BufferLen()
}
copy(p, currentBuffer.BufferData()[:readLen])
r.current.element = currentElement
r.current.bufIndex = readLen
return readLen, nil
}
// IsCurrentPacketReadFinished means to validate the current reading package is reading finished
func (r *Buffer) IsCurrentPacketReadFinished() bool {
return r.current.bufIndex == r.current.element.Value.(events.SocketDataBuffer).BufferLen()
}
func (r *Buffer) ResetForLoopReading() {
r.head = nil
r.current = nil
}
func (r *Buffer) PrepareForReading() bool {
if r.dataEvents.Len() == 0 {
return false
}
if r.head == nil || r.head.element == nil {
// read in the first element
r.eventLocker.RLock()
defer r.eventLocker.RUnlock()
r.head = &Position{element: r.dataEvents.Front(), bufIndex: 0}
r.current = r.head.Clone()
} else {
// make sure we can read from head
r.current = r.head.Clone()
}
return true
}
func (r *Buffer) RemoveReadElements() bool {
r.eventLocker.Lock()
defer r.eventLocker.Unlock()
// delete until the last data id
if r.head.element != nil && r.current.element != nil {
firstDataID := r.head.element.Value.(events.SocketDataBuffer).DataID()
lastDataID := r.current.element.Value.(events.SocketDataBuffer).DataID()
startDelete := false
for e := r.detailEvents.Front(); e != nil; {
event := e.Value.(*events.SocketDetailEvent)
if !startDelete && event.DataID >= firstDataID && event.DataID <= lastDataID {
startDelete = true
} else if startDelete && event.DataID > lastDataID {
// out of the data id, just break
break
}
if startDelete {
tmp := e.Next()
r.detailEvents.Remove(e)
e = tmp
} else {
e = e.Next()
}
}
}
// delete until to current position
next := r.head.element
for ; next != nil && next != r.current.element; next = r.removeElement0(next) {
}
if next != nil && next.Value.(events.SocketDataBuffer).BufferLen() == r.current.bufIndex {
// the last event already read finished, then delete it
r.head.element = r.removeElement0(next)
r.head.bufIndex = 0
} else if next != nil {
// keep using the latest element
r.head.element = next
} else {
return true
}
return false
}
// SkipCurrentElement skip current element in reader, if return true means have read finished
func (r *Buffer) SkipCurrentElement() bool {
r.head.element = r.nextElement(r.current.element)
r.current.bufIndex = 0
return r.head.element == nil
}
func (r *Buffer) removeElement0(element *list.Element) *list.Element {
if element == nil {
return nil
}
result := element.Next()
r.dataEvents.Remove(element)
return result
}
func (r *Buffer) AppendDetailEvent(event *events.SocketDetailEvent) {
r.eventLocker.Lock()
defer r.eventLocker.Unlock()
if r.detailEvents.Len() == 0 {
r.detailEvents.PushFront(event)
return
}
if r.detailEvents.Back().Value.(*events.SocketDetailEvent).DataID < event.DataID {
r.detailEvents.PushBack(event)
return
}
beenAdded := false
for element := r.detailEvents.Front(); element != nil; element = element.Next() {
existEvent := element.Value.(*events.SocketDetailEvent)
if existEvent.DataID > event.DataID {
// data id needs order
beenAdded = true
}
if beenAdded {
r.detailEvents.InsertBefore(event, element)
break
}
}
if !beenAdded {
r.detailEvents.PushBack(event)
}
}
// AppendDataEvent insert the event to the event list following the order
func (r *Buffer) AppendDataEvent(event *events.SocketDataUploadEvent) {
r.eventLocker.Lock()
defer r.eventLocker.Unlock()
if r.dataEvents.Len() == 0 {
r.dataEvents.PushFront(event)
return
}
if r.dataEvents.Back().Value.(events.SocketDataBuffer).DataID() < event.DataID() {
r.dataEvents.PushBack(event)
return
}
beenAdded := false
for element := r.dataEvents.Front(); element != nil; element = element.Next() {
existEvent := element.Value.(events.SocketDataBuffer)
if existEvent.DataID() > event.DataID() {
// data id needs order
beenAdded = true
} else if existEvent.DataID() == event.DataID() && existEvent.DataSequence() > event.DataSequence() {
// following the sequence order
beenAdded = true
}
if beenAdded {
r.dataEvents.InsertBefore(event, element)
break
}
}
if !beenAdded {
r.dataEvents.PushBack(event)
}
}
func (r *Buffer) DeleteExpireEvents(expireDuration time.Duration) int {
r.eventLocker.Lock()
defer r.eventLocker.Unlock()
expireTime := time.Now().Add(-expireDuration)
// data event queue
count := r.deleteEventsWithJudgement(r.dataEvents, func(element *list.Element) bool {
buffer := element.Value.(events.SocketDataBuffer)
startTime := host.Time(buffer.StartTime())
if expireTime.After(startTime) {
r.latestExpiredDataID = buffer.DataID()
return true
}
return false
})
// detail event queue
count += r.deleteEventsWithJudgement(r.detailEvents, func(element *list.Element) bool {
return r.latestExpiredDataID > 0 && element.Value.(*events.SocketDetailEvent).DataID <= r.latestExpiredDataID
})
return count
}
func (r *Buffer) DataLength() int {
return r.dataEvents.Len()
}
func (r *Buffer) DetailLength() int {
return r.detailEvents.Len()
}
func (r *Buffer) deleteEventsWithJudgement(l *list.List, checker func(element *list.Element) bool) int {
count := 0
for e := l.Front(); e != nil; {
if checker(e) {
count++
cur := e
e = e.Next()
l.Remove(cur)
} else {
break
}
}
return count
}
func (r *Buffer) nextElement(e *list.Element) *list.Element {
if e == nil {
return nil
}
r.eventLocker.RLock()
defer r.eventLocker.RUnlock()
return e.Next()
}
func (p *Position) Clone() *Position {
return &Position{element: p.element, bufIndex: p.bufIndex}
}