blob: 733919b8baeb33b75f9d9a374ba35ed3c19f18f3 [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package http1
import (
"bufio"
"bytes"
"container/list"
"net/http"
base2 "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
)
type BufferAnalyzer struct {
http1Analyzer *Analyzer
unknownEventBuffer base2.SocketDataBuffer
unknownElement *list.Element
unknownSize int
request *base2.SocketDataUploadEvent
requestElement *list.Element
response *base2.SocketDataUploadEvent
responseElement *list.Element
unknownDataID uint64
unknownMaxSequence uint16
reqDataID uint64
reqMaxSequence uint16
reqFinished bool
respDataID uint64
respMaxSequence uint16
respFinished bool
}
func NewHTTP1BufferAnalyzer(http1 *Analyzer) *BufferAnalyzer {
return &BufferAnalyzer{http1Analyzer: http1}
}
func (h *BufferAnalyzer) Analyze(events *list.List) (request, response base2.SocketDataBuffer) {
for element := events.Front(); element != nil; element = element.Next() {
curEvent := element.Value.(*base2.SocketDataUploadEvent)
// transform the unknown to the request or response
if continueReading, req, resp := h.handleUnknown(events, element, curEvent); req != nil && resp != nil {
return req, resp
} else if continueReading {
continue
}
if continueReading, req, resp := h.handleRequest(events, element, curEvent); req != nil && resp != nil {
return req, resp
} else if continueReading {
continue
}
if req, resp := h.handleResponse(events, element, curEvent); req != nil && resp != nil {
return req, resp
}
}
return nil, nil
}
func (h *BufferAnalyzer) handleUnknown(event *list.List, element *list.Element,
curEvent *base2.SocketDataUploadEvent) (continueReading bool, req, resp base2.SocketDataBuffer) {
if curEvent.MsgType != base.SocketMessageTypeUnknown {
return false, nil, nil
}
if h.unknownEventBuffer == nil {
// maybe the unknown type is response, so clean the context
if !curEvent.IsStart() {
h.cleanContext()
return true, nil, nil
}
h.resetStartUnknown(element, curEvent)
req, resp = h.tryingToAnalyzeTheUnknown(event, curEvent)
if req != nil && resp != nil {
return false, req, resp
}
return true, nil, nil
}
if curEvent.MsgType == base.SocketMessageTypeUnknown {
if h.unknownDataID == curEvent.DataID && h.unknownMaxSequence+1 == curEvent.Sequence {
h.unknownEventBuffer = h.unknownEventBuffer.Combine(curEvent)
h.unknownMaxSequence++
} else if curEvent.IsStart() {
h.resetStartUnknown(element, curEvent)
} else {
h.cleanContext()
}
req, resp = h.tryingToAnalyzeTheUnknown(event, curEvent)
if req != nil && resp != nil {
return false, req, resp
}
return true, nil, nil
}
return false, nil, nil
}
func (h *BufferAnalyzer) handleRequest(events *list.List, element *list.Element,
curEvent *base2.SocketDataUploadEvent) (continueReading bool, req, resp base2.SocketDataBuffer) {
if h.request == nil {
// find the first request package event
if curEvent.MsgType == base.SocketMessageTypeRequest && curEvent.IsStart() {
h.resetStartRequest(element, curEvent)
}
return true, nil, nil
}
if curEvent.MsgType == base.SocketMessageTypeRequest {
// if the request not finished and latest request sequence match with current event
// then keep the request tracing
if !h.reqFinished && h.reqDataID == curEvent.DataID && h.reqMaxSequence+1 == curEvent.Sequence {
h.reqMaxSequence++
h.reqFinished = curEvent.IsFinished()
} else if curEvent.IsStart() {
// if current event is new one, then update to current request
h.resetStartRequest(element, curEvent)
} else {
// Otherwise, clean the request and response context
h.cleanContext()
}
// if request and response all finished, then return
if h.reqFinished && h.respFinished {
req, resp = h.buildHTTP(events)
return false, req, resp
}
return true, nil, nil
}
return false, nil, nil
}
func (h *BufferAnalyzer) handleResponse(events *list.List, element *list.Element,
curEvent *base2.SocketDataUploadEvent) (req, resp base2.SocketDataBuffer) {
if h.response == nil {
// if current response is not start, then clean to re-find new one
if !curEvent.IsStart() {
h.cleanContext()
return nil, nil
}
h.resetStartResponse(element, curEvent)
if h.reqFinished && h.respFinished {
return h.buildHTTP(events)
}
return nil, nil
}
// if a new response, then clean the re-find new one, wait the previous data
if curEvent.IsStart() {
h.cleanContext()
return nil, nil
}
// if response sequence is broken, then clean the context
if h.respDataID != curEvent.DataID || h.respMaxSequence+1 != curEvent.Sequence {
h.cleanContext()
return nil, nil
}
h.respDataID = curEvent.DataID
h.respMaxSequence = curEvent.Sequence
if h.reqFinished && curEvent.IsFinished() {
return h.buildHTTP(events)
}
return nil, nil
}
func (h *BufferAnalyzer) resetStartUnknown(element *list.Element, curEvent *base2.SocketDataUploadEvent) {
h.unknownEventBuffer = curEvent
h.unknownElement = element
h.unknownDataID = curEvent.DataID
h.unknownMaxSequence = curEvent.Sequence
}
func (h *BufferAnalyzer) resetStartRequest(element *list.Element, curEvent *base2.SocketDataUploadEvent) {
h.request = curEvent
h.reqDataID = curEvent.DataID
h.reqMaxSequence = curEvent.Sequence
h.reqFinished = curEvent.IsFinished()
h.requestElement = element
}
func (h *BufferAnalyzer) resetStartResponse(element *list.Element, curEvent *base2.SocketDataUploadEvent) {
h.response = curEvent
h.respDataID = curEvent.DataID
h.respMaxSequence = curEvent.Sequence
h.responseElement = element
h.respFinished = curEvent.IsFinished()
}
func (h *BufferAnalyzer) tryingToAnalyzeTheUnknown(events *list.List, curEvent *base2.SocketDataUploadEvent) (req, resp base2.SocketDataBuffer) {
if h.unknownEventBuffer == nil {
return nil, nil
}
// length not enough
if len(h.unknownEventBuffer.BufferData()) < 16 {
return nil, nil
}
_, err := http.ReadRequest(bufio.NewReader(bytes.NewBuffer(h.unknownEventBuffer.BufferData())))
if err == nil {
// update the event as request
curEvent.Finished = 1
h.transformUnknown(h.unknownElement, base.SocketMessageTypeRequest)
// update the current data is request
h.resetStartRequest(h.unknownElement, h.unknownEventBuffer.FirstEvent())
h.reqFinished = true
h.cleanResponseContext()
h.cleanUnknownContext()
return nil, nil
}
tmpResponse, err := http.ReadResponse(bufio.NewReader(bytes.NewBuffer(h.unknownEventBuffer.BufferData())), &http.Request{})
if err == nil {
defer tmpResponse.Body.Close()
curEvent.Finished = 1
h.transformUnknown(h.unknownElement, base.SocketMessageTypeResponse)
// if request already finished, then remove the request
if h.reqFinished {
h.resetStartResponse(h.unknownElement, h.unknownEventBuffer.FirstEvent())
return h.buildHTTP(events)
}
// otherwise, clean the context and wait request
h.cleanContext()
}
return nil, nil
}
func (h *BufferAnalyzer) transformUnknown(element *list.Element, msgType base.SocketMessageType) {
// update message type and total size
firstEvent := element.Value.(*base2.SocketDataUploadEvent)
firstEvent.MsgType = msgType
dataLen := int(firstEvent.DataLen)
for e := element.Next(); e != nil; e = e.Next() {
curEvent := e.Value.(*base2.SocketDataUploadEvent)
if curEvent.Finished == 1 {
curEvent.MsgType = msgType
dataLen += int(curEvent.DataLen)
firstEvent.TotalSize0 = uint64(dataLen)
return
}
curEvent.MsgType = msgType
dataLen += int(curEvent.DataLen)
}
}
func (h *BufferAnalyzer) cleanContext() {
h.cleanUnknownContext()
h.cleanRequestContext()
h.cleanResponseContext()
}
func (h *BufferAnalyzer) cleanResponseContext() {
h.response = nil
h.respDataID = 0
h.respMaxSequence = 0
h.respFinished = false
}
func (h *BufferAnalyzer) cleanRequestContext() {
h.request = nil
h.reqDataID = 0
h.reqMaxSequence = 0
h.reqFinished = false
}
func (h *BufferAnalyzer) cleanUnknownContext() {
h.unknownEventBuffer, h.unknownElement = nil, nil
h.unknownSize, h.unknownDataID, h.unknownMaxSequence = 0, 0, 0
}
func (h *BufferAnalyzer) buildHTTP(events *list.List) (req, resp base2.SocketDataBuffer) {
return h.http1Analyzer.combineAndRemoveEvent(events, h.requestElement, nil),
h.http1Analyzer.combineAndRemoveEvent(events, h.responseElement, nil)
}