| // Licensed to Apache Software Foundation (ASF) under one or more contributor |
| // license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright |
| // ownership. Apache Software Foundation (ASF) licenses this file to you under |
| // the Apache License, Version 2.0 (the "License"); you may |
| // not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package buffer |
| |
| import ( |
| "container/list" |
| "errors" |
| "fmt" |
| "io" |
| "sync" |
| "time" |
| |
| "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events" |
| "github.com/apache/skywalking-rover/pkg/tools/host" |
| ) |
| |
| var ( |
| ErrNotComplete = errors.New("socket: not complete event") |
| ) |
| |
| type Buffer struct { |
| dataEvents *list.List |
| detailEvents *list.List |
| validated bool // the events list is validated or not |
| |
| eventLocker sync.RWMutex |
| |
| head *Position |
| current *Position |
| |
| // record the latest expired data id in connection for expire the older socket detail |
| // because the older socket detail may not be received in buffer |
| latestExpiredDataID uint64 |
| } |
| |
| type Position struct { |
| // element of the event list |
| element *list.Element |
| // bufIndex the buffer index of the element |
| bufIndex int |
| } |
| |
| func (p *Position) String() string { |
| buffer := p.element.Value.(events.SocketDataBuffer) |
| return fmt.Sprintf("data id: %d, sequence: %d, buffer index: %d", |
| buffer.DataID(), buffer.DataSequence(), p.bufIndex) |
| } |
| |
| func NewBuffer() *Buffer { |
| return &Buffer{ |
| dataEvents: list.New(), |
| detailEvents: list.New(), |
| validated: false, |
| } |
| } |
| |
| func (r *Buffer) FindFirstDataBuffer(dataID uint64) events.SocketDataBuffer { |
| for e := r.dataEvents.Front(); e != nil; e = e.Next() { |
| cur := e.Value.(events.SocketDataBuffer) |
| if cur.DataID() == dataID { |
| return cur |
| } |
| } |
| return nil |
| } |
| |
| func (r *Buffer) Position() *Position { |
| return r.current.Clone() |
| } |
| |
| func (r *Buffer) Slice(validated bool, start, end *Position) *Buffer { |
| dataEvents := list.New() |
| detailEvents := list.New() |
| var firstDetailElement *list.Element |
| for nextElement := start.element; nextElement != end.element; nextElement = nextElement.Next() { |
| // found first matches detail event |
| if detailEvents.Len() == 0 || firstDetailElement == nil { |
| for e := r.detailEvents.Front(); e != nil; e = e.Next() { |
| if e.Value.(*events.SocketDetailEvent).DataID >= nextElement.Value.(events.SocketDataBuffer).DataID() { |
| detailEvents.PushBack(e.Value) |
| firstDetailElement = e |
| break |
| } |
| } |
| } |
| dataEvents.PushBack(nextElement.Value) |
| } |
| lastBuffer := end.element.Value.(events.SocketDataBuffer) |
| dataEvents.PushBack(&events.SocketDataEventLimited{SocketDataBuffer: lastBuffer, Size: end.bufIndex}) |
| |
| // if the first detail element been found, append the details until the last buffer data id |
| if firstDetailElement == nil { |
| for e := r.detailEvents.Front(); e != nil; e = e.Next() { |
| if e.Value.(*events.SocketDetailEvent).DataID == lastBuffer.DataID() { |
| detailEvents.PushBack(e.Value) |
| break |
| } |
| } |
| } else if firstDetailElement != nil && firstDetailElement.Value.(*events.SocketDetailEvent).DataID != lastBuffer.DataID() { |
| for tmp := firstDetailElement.Next(); tmp != nil; tmp = tmp.Next() { |
| if tmp.Value.(*events.SocketDetailEvent).DataID > lastBuffer.DataID() { |
| break |
| } |
| detailEvents.PushBack(tmp.Value) |
| } |
| } |
| |
| return &Buffer{ |
| dataEvents: dataEvents, |
| detailEvents: detailEvents, |
| validated: validated, |
| head: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex}, |
| current: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex}, |
| } |
| } |
| |
| func (r *Buffer) Len() int { |
| if r == nil || r.head == nil { |
| return 0 |
| } |
| var result int |
| var startIndex = r.head.bufIndex |
| for e := r.head.element; e != nil; e = e.Next() { |
| result += r.head.element.Value.(events.SocketDataBuffer).BufferLen() - startIndex |
| startIndex = 0 |
| } |
| return result |
| } |
| |
| func (r *Buffer) Details() *list.List { |
| return r.detailEvents |
| } |
| |
| func (r *Buffer) FirstSocketBuffer() events.SocketDataBuffer { |
| if r.dataEvents.Len() == 0 { |
| return nil |
| } |
| return r.dataEvents.Front().Value.(events.SocketDataBuffer) |
| } |
| |
| func (r *Buffer) LastSocketBuffer() events.SocketDataBuffer { |
| if r.dataEvents.Len() == 0 { |
| return nil |
| } |
| return r.dataEvents.Back().Value.(events.SocketDataBuffer) |
| } |
| |
| // DetectNotSendingLastPosition detect the buffer contains not sending data: the BPF limited socket data count |
| func (r *Buffer) DetectNotSendingLastPosition() *Position { |
| if r.dataEvents.Len() == 0 { |
| return nil |
| } |
| |
| for e := r.dataEvents.Front(); e != nil; e = e.Next() { |
| buf := e.Value.(events.SocketDataBuffer) |
| // the buffer is sent finished but still have reduced data not send |
| if buf.IsFinished() && buf.HaveReduceDataAfterChunk() { |
| return &Position{element: e, bufIndex: buf.BufferLen()} |
| } |
| } |
| return nil |
| } |
| |
| func CombineSlices(validated bool, buffers ...*Buffer) *Buffer { |
| if len(buffers) == 0 { |
| return nil |
| } |
| if len(buffers) == 1 { |
| return buffers[0] |
| } |
| dataEvents := list.New() |
| detailEvents := list.New() |
| for _, b := range buffers { |
| if b.head.bufIndex > 0 { |
| headBuffer := b.dataEvents.Front().Value.(events.SocketDataBuffer) |
| dataEvents.PushBack(&events.SocketDataEventLimited{SocketDataBuffer: headBuffer, |
| From: b.head.bufIndex, Size: headBuffer.BufferLen()}) |
| for next := b.dataEvents.Front().Next(); next != nil; next = next.Next() { |
| dataEvents.PushBack(next.Value) |
| } |
| } else { |
| dataEvents.PushBackList(b.dataEvents) |
| } |
| detailEvents.PushBackList(b.detailEvents) |
| } |
| |
| return &Buffer{ |
| dataEvents: dataEvents, |
| detailEvents: detailEvents, |
| validated: validated, |
| head: &Position{element: dataEvents.Front(), bufIndex: 0}, |
| current: &Position{element: dataEvents.Front(), bufIndex: 0}, |
| } |
| } |
| |
| func (r *Buffer) Peek(p []byte) (n int, err error) { |
| // save the index temporary |
| tmpPosition := r.current.Clone() |
| // restore the index |
| defer func() { |
| r.current = tmpPosition |
| }() |
| readIndex := 0 |
| for readIndex < len(p) { |
| count, err := r.Read(p[readIndex:]) |
| if err != nil { |
| return 0, err |
| } |
| readIndex += count |
| } |
| return readIndex, nil |
| } |
| |
| func (r *Buffer) OffsetPosition(offset int) *Position { |
| var nextElement func(e *list.Element) *list.Element |
| if offset == 0 { |
| return r.current.Clone() |
| } else if offset > 0 { |
| nextElement = func(e *list.Element) *list.Element { |
| return e.Next() |
| } |
| } else { |
| nextElement = func(e *list.Element) *list.Element { |
| return e.Prev() |
| } |
| } |
| |
| var curEle = r.current.element |
| var curIndex = r.current.bufIndex |
| for ; curEle != nil; curEle = nextElement(curEle) { |
| nextOffset := curIndex + offset |
| bufferLen := curEle.Value.(events.SocketDataBuffer).BufferLen() |
| if nextOffset >= 0 && nextOffset < bufferLen { |
| curIndex += offset |
| break |
| } |
| |
| if offset > 0 { |
| offset -= bufferLen - curIndex |
| curIndex = 0 |
| } else { |
| offset += curIndex |
| next := nextElement(curEle) |
| if next == nil { |
| curEle = next |
| break |
| } |
| curIndex = curEle.Value.(events.SocketDataBuffer).BufferLen() |
| } |
| } |
| |
| if curEle == nil { |
| return nil |
| } |
| return &Position{element: curEle, bufIndex: curIndex} |
| } |
| |
| func (r *Buffer) Read(p []byte) (n int, err error) { |
| if len(p) == 0 { |
| return 0, nil |
| } |
| if r.current == nil || r.current.element == nil { |
| return 0, io.EOF |
| } |
| element, n := r.ReadFromCurrent(p) |
| if n > 0 { |
| return n, nil |
| } |
| |
| curEvent := element.Value.(events.SocketDataBuffer) |
| next := r.nextElement(element) |
| if next == nil { |
| return 0, io.EOF |
| } |
| nextEvent := next.Value.(events.SocketDataBuffer) |
| |
| var shouldRead = false |
| if r.validated { |
| shouldRead = true |
| // same data id and sequence orders |
| } else if (curEvent.DataID() == nextEvent.DataID() && curEvent.DataSequence()+1 == nextEvent.DataSequence()) || |
| // cur event is finished and next event is start |
| (nextEvent.IsStart() && curEvent.IsFinished()) || |
| // same data id and sequence but have difference buffer index |
| (curEvent.DataID() == nextEvent.DataID() && curEvent.DataSequence() == nextEvent.DataSequence() && |
| r.current.bufIndex <= nextEvent.BufferStartPosition()) { |
| shouldRead = true |
| } |
| |
| if !shouldRead { |
| return 0, ErrNotComplete |
| } |
| |
| return r.read0(next, nextEvent, p) |
| } |
| |
| func (r *Buffer) ReadFromCurrent(p []byte) (element *list.Element, n int) { |
| element = r.current.element |
| curEvent := element.Value.(events.SocketDataBuffer) |
| residueSize := curEvent.BufferLen() - r.current.bufIndex |
| if residueSize > 0 { |
| readLen := len(p) |
| if residueSize < readLen { |
| readLen = residueSize |
| } |
| |
| n = copy(p, curEvent.BufferData()[r.current.bufIndex:r.current.bufIndex+readLen]) |
| r.current.bufIndex += n |
| return element, n |
| } |
| return element, 0 |
| } |
| |
| func (r *Buffer) read0(currentElement *list.Element, currentBuffer events.SocketDataBuffer, p []byte) (n int, err error) { |
| readLen := len(p) |
| if currentBuffer.BufferLen() < readLen { |
| readLen = currentBuffer.BufferLen() |
| } |
| |
| copy(p, currentBuffer.BufferData()[:readLen]) |
| r.current.element = currentElement |
| r.current.bufIndex = readLen |
| return readLen, nil |
| } |
| |
| // IsCurrentPacketReadFinished means to validate the current reading package is reading finished |
| func (r *Buffer) IsCurrentPacketReadFinished() bool { |
| return r.current.bufIndex == r.current.element.Value.(events.SocketDataBuffer).BufferLen() |
| } |
| |
| func (r *Buffer) ResetForLoopReading() { |
| r.head = nil |
| r.current = nil |
| } |
| |
| func (r *Buffer) PrepareForReading() bool { |
| if r.dataEvents.Len() == 0 { |
| return false |
| } |
| if r.head == nil || r.head.element == nil { |
| // read in the first element |
| r.eventLocker.RLock() |
| defer r.eventLocker.RUnlock() |
| r.head = &Position{element: r.dataEvents.Front(), bufIndex: 0} |
| r.current = r.head.Clone() |
| } else { |
| // make sure we can read from head |
| r.current = r.head.Clone() |
| } |
| |
| return true |
| } |
| |
| func (r *Buffer) RemoveReadElements() bool { |
| r.eventLocker.Lock() |
| defer r.eventLocker.Unlock() |
| |
| // delete until the last data id |
| if r.head.element != nil && r.current.element != nil { |
| firstDataID := r.head.element.Value.(events.SocketDataBuffer).DataID() |
| lastDataID := r.current.element.Value.(events.SocketDataBuffer).DataID() |
| startDelete := false |
| for e := r.detailEvents.Front(); e != nil; { |
| event := e.Value.(*events.SocketDetailEvent) |
| if !startDelete && event.DataID >= firstDataID && event.DataID <= lastDataID { |
| startDelete = true |
| } else if startDelete && event.DataID > lastDataID { |
| // out of the data id, just break |
| break |
| } |
| |
| if startDelete { |
| tmp := e.Next() |
| r.detailEvents.Remove(e) |
| e = tmp |
| } else { |
| e = e.Next() |
| } |
| } |
| } |
| |
| // delete until to current position |
| next := r.head.element |
| for ; next != nil && next != r.current.element; next = r.removeElement0(next) { |
| } |
| if next != nil && next.Value.(events.SocketDataBuffer).BufferLen() == r.current.bufIndex { |
| // the last event already read finished, then delete it |
| r.head.element = r.removeElement0(next) |
| r.head.bufIndex = 0 |
| } else if next != nil { |
| // keep using the latest element |
| r.head.element = next |
| } else { |
| return true |
| } |
| return false |
| } |
| |
| // SkipCurrentElement skip current element in reader, if return true means have read finished |
| func (r *Buffer) SkipCurrentElement() bool { |
| r.head.element = r.nextElement(r.current.element) |
| r.current.bufIndex = 0 |
| |
| return r.head.element == nil |
| } |
| |
| func (r *Buffer) removeElement0(element *list.Element) *list.Element { |
| if element == nil { |
| return nil |
| } |
| result := element.Next() |
| r.dataEvents.Remove(element) |
| return result |
| } |
| |
| func (r *Buffer) AppendDetailEvent(event *events.SocketDetailEvent) { |
| r.eventLocker.Lock() |
| defer r.eventLocker.Unlock() |
| |
| if r.detailEvents.Len() == 0 { |
| r.detailEvents.PushFront(event) |
| return |
| } |
| if r.detailEvents.Back().Value.(*events.SocketDetailEvent).DataID < event.DataID { |
| r.detailEvents.PushBack(event) |
| return |
| } |
| beenAdded := false |
| for element := r.detailEvents.Front(); element != nil; element = element.Next() { |
| existEvent := element.Value.(*events.SocketDetailEvent) |
| if existEvent.DataID > event.DataID { |
| // data id needs order |
| beenAdded = true |
| } |
| if beenAdded { |
| r.detailEvents.InsertBefore(event, element) |
| break |
| } |
| } |
| if !beenAdded { |
| r.detailEvents.PushBack(event) |
| } |
| } |
| |
| // AppendDataEvent insert the event to the event list following the order |
| func (r *Buffer) AppendDataEvent(event *events.SocketDataUploadEvent) { |
| r.eventLocker.Lock() |
| defer r.eventLocker.Unlock() |
| |
| if r.dataEvents.Len() == 0 { |
| r.dataEvents.PushFront(event) |
| return |
| } |
| if r.dataEvents.Back().Value.(events.SocketDataBuffer).DataID() < event.DataID() { |
| r.dataEvents.PushBack(event) |
| return |
| } |
| beenAdded := false |
| for element := r.dataEvents.Front(); element != nil; element = element.Next() { |
| existEvent := element.Value.(events.SocketDataBuffer) |
| if existEvent.DataID() > event.DataID() { |
| // data id needs order |
| beenAdded = true |
| } else if existEvent.DataID() == event.DataID() && existEvent.DataSequence() > event.DataSequence() { |
| // following the sequence order |
| beenAdded = true |
| } |
| if beenAdded { |
| r.dataEvents.InsertBefore(event, element) |
| break |
| } |
| } |
| if !beenAdded { |
| r.dataEvents.PushBack(event) |
| } |
| } |
| |
| func (r *Buffer) DeleteExpireEvents(expireDuration time.Duration) int { |
| r.eventLocker.Lock() |
| defer r.eventLocker.Unlock() |
| |
| expireTime := time.Now().Add(-expireDuration) |
| // data event queue |
| count := r.deleteEventsWithJudgement(r.dataEvents, func(element *list.Element) bool { |
| buffer := element.Value.(events.SocketDataBuffer) |
| startTime := host.Time(buffer.StartTime()) |
| if expireTime.After(startTime) { |
| r.latestExpiredDataID = buffer.DataID() |
| return true |
| } |
| return false |
| }) |
| |
| // detail event queue |
| count += r.deleteEventsWithJudgement(r.detailEvents, func(element *list.Element) bool { |
| return r.latestExpiredDataID > 0 && element.Value.(*events.SocketDetailEvent).DataID <= r.latestExpiredDataID |
| }) |
| return count |
| } |
| |
| func (r *Buffer) DataLength() int { |
| return r.dataEvents.Len() |
| } |
| |
| func (r *Buffer) DetailLength() int { |
| return r.detailEvents.Len() |
| } |
| |
| func (r *Buffer) deleteEventsWithJudgement(l *list.List, checker func(element *list.Element) bool) int { |
| count := 0 |
| for e := l.Front(); e != nil; { |
| if checker(e) { |
| count++ |
| cur := e |
| e = e.Next() |
| l.Remove(cur) |
| } else { |
| break |
| } |
| } |
| return count |
| } |
| |
| func (r *Buffer) nextElement(e *list.Element) *list.Element { |
| if e == nil { |
| return nil |
| } |
| r.eventLocker.RLock() |
| defer r.eventLocker.RUnlock() |
| return e.Next() |
| } |
| |
| func (p *Position) Clone() *Position { |
| return &Position{element: p.element, bufIndex: p.bufIndex} |
| } |