blob: 15b49ccaa571bf30c59b101ef9b2822044833ffa [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package protocols
import (
"bufio"
"bytes"
"container/list"
"encoding/json"
"fmt"
"io"
"net/http"
"net/textproto"
"strconv"
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
"github.com/apache/skywalking-rover/pkg/process/api"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/metrics"
)
var HTTP1ProtocolName = "http1"
var HTTP1PackageSizeHistogramBuckets = []float64{
// 0.25KB, 0.5KB, 1KB, 1.5KB, 2KB, 3KB, 5KB, 8KB, 10KB, 15KB, 20KB, 35KB, 50KB, 75KB, 100KB, 200KB, 500KB
256, 512, 1048, 1536, 2048, 3072, 5120, 8192, 10240, 15360, 20480, 35840, 51200, 76800, 102400, 204800, 512000,
// 800KB, 1M, 1.5M, 2M, 5M, 10M, 20M, 50M
819200, 1048576, 1572864, 2097152, 5242880, 10485760, 20971520, 52428800,
}
var HTTP1DurationHistogramBuckets = []float64{
// unit ms
1, 2, 5, 10, 15, 20, 25, 30, 40, 45, 50, 60, 65, 70, 80, 90, 100, 110, 130, 150, 170, 200, 230, 260, 290,
330, 380, 430, 480, 500, 600, 700, 800, 900, 1000, 1100, 1300, 1500, 1800, 2000, 5000, 10000, 15000, 20000, 30000,
}
type HTTP1Analyzer struct {
// cache connection metrics if the connect event not receive or process
cache map[string]*HTTP1ConnectionMetrics
}
type HTTP1ConnectionMetrics struct {
// halfData all data event(request/response) not finished
halfData *list.List
combinedMetrics *HTTP1URIMetrics
metricsLocker sync.RWMutex
}
type HTTP1URIMetrics struct {
RequestCounter *metrics.Counter
StatusCounter map[int]*metrics.Counter
AvgRequestPackageSize *metrics.AvgCounter
AvgResponsePackageSize *metrics.AvgCounter
ReqPackageSizeHistogram *metrics.Histogram
RespPackageSizeHistogram *metrics.Histogram
ClientAvgDuration *metrics.AvgCounter
ServerAvgDuration *metrics.AvgCounter
ClientDurationHistogram *metrics.Histogram
ServerDurationHistogram *metrics.Histogram
}
func NewHTTP1URIMetrics() *HTTP1URIMetrics {
return &HTTP1URIMetrics{
RequestCounter: metrics.NewCounter(),
StatusCounter: make(map[int]*metrics.Counter),
AvgRequestPackageSize: metrics.NewAvgCounter(),
AvgResponsePackageSize: metrics.NewAvgCounter(),
ReqPackageSizeHistogram: metrics.NewHistogram(HTTP1PackageSizeHistogramBuckets),
RespPackageSizeHistogram: metrics.NewHistogram(HTTP1PackageSizeHistogramBuckets),
ClientAvgDuration: metrics.NewAvgCounter(),
ServerAvgDuration: metrics.NewAvgCounter(),
ClientDurationHistogram: metrics.NewHistogram(HTTP1DurationHistogramBuckets),
ServerDurationHistogram: metrics.NewHistogram(HTTP1DurationHistogramBuckets),
}
}
func NewHTTP1Analyzer() Protocol {
return &HTTP1Analyzer{
cache: make(map[string]*HTTP1ConnectionMetrics),
}
}
func (h *HTTP1Analyzer) Name() string {
return HTTP1ProtocolName
}
func (h *HTTP1Analyzer) GenerateMetrics() Metrics {
return &HTTP1ConnectionMetrics{
halfData: list.New(),
combinedMetrics: NewHTTP1URIMetrics(),
}
}
func (h *HTTP1Analyzer) ReceiveData(context Context, event *SocketDataUploadEvent) bool {
// only handle the HTTP1 protocol
if event.Protocol != base.ConnectionProtocolHTTP {
return false
}
connectionID := event.GenerateConnectionID()
fromAnalyzerCache := false
var connectionMetrics *HTTP1ConnectionMetrics
connection := context.QueryConnection(event.ConnectionID, event.RandomID)
// if connection not exists, then cached it into the analyzer context
if connection == nil {
connectionMetrics = h.cache[connectionID]
fromAnalyzerCache = true
if connectionMetrics == nil {
connectionMetrics = h.GenerateMetrics().(*HTTP1ConnectionMetrics)
h.cache[connectionID] = connectionMetrics
}
} else {
connectionMetrics = QueryProtocolMetrics(connection.Metrics, HTTP1ProtocolName).(*HTTP1ConnectionMetrics)
}
log.Debugf("receive connection: %s, dataid: %d, sequence: %d, finished: %d, message type: %s, direction: %s, size: %d, total size: %d",
connectionID, event.DataID, event.Sequence, event.Finished, event.MsgType.String(), event.Direction().String(), event.DataLen, event.TotalSize0)
// if the cache is existing in the analyzer context, then delete it
if !fromAnalyzerCache {
if tmp := h.cache[connectionID]; tmp != nil {
connectionMetrics.MergeFrom(h, tmp)
delete(h.cache, connectionID)
}
}
req, resp := h.buildHTTP1(connectionMetrics.halfData, event)
if req != nil && resp != nil {
if err := h.analyze(context, connectionID, connectionMetrics, req, resp); err != nil {
log.Errorf("HTTP1 analyze failure: %v", err)
return false
}
} else {
log.Debugf("connnection: %s, remaining half data list size: %d", connectionID, connectionMetrics.halfData.Len())
}
return true
}
func (h *HTTP1Analyzer) combineAndRemoveEvent(halfConnections *list.List, firstElement *list.Element,
lastAppender SocketDataBuffer) SocketDataBuffer {
firstEvent := firstElement.Value.(*SocketDataUploadEvent)
if firstEvent.Sequence == 0 && firstEvent.Finished == 1 {
halfConnections.Remove(firstElement)
return h.combineEventIfNeed(firstEvent, lastAppender)
}
next := firstElement.Next()
halfConnections.Remove(firstElement)
var buffer SocketDataBuffer = firstEvent
// for-each the events until buffer finished
for next != nil {
event := next.Value.(*SocketDataUploadEvent)
buffer = buffer.Combine(event)
tmp := next.Next()
halfConnections.Remove(next)
next = tmp
// combine event
if event.Finished == 1 {
return h.combineEventIfNeed(buffer, lastAppender)
}
}
return h.combineEventIfNeed(buffer, lastAppender)
}
func (h *HTTP1Analyzer) combineEventIfNeed(data, appender SocketDataBuffer) SocketDataBuffer {
if appender != nil {
return data.Combine(appender)
}
return data
}
func (h *HTTP1Analyzer) buildHTTP1(halfConnections *list.List, event *SocketDataUploadEvent) (request, response SocketDataBuffer) {
// no connections, then just add the response to the half connections to wait the request
if halfConnections.Len() == 0 {
halfConnections.PushBack(event)
return nil, nil
}
// quick handler(only one element, and is request)
if halfConnections.Len() == 1 {
firstElement := halfConnections.Front()
firstEvent := firstElement.Value.(*SocketDataUploadEvent)
if firstEvent.IsStart() && firstEvent.IsFinished() && event.IsStart() && event.IsFinished() &&
firstEvent.DataID+1 == event.DataID && firstEvent.MsgType == base.SocketMessageTypeRequest &&
event.MsgType == base.SocketMessageTypeResponse {
return h.combineAndRemoveEvent(halfConnections, firstElement, nil), event
}
}
// push to the queue
h.insertToList(halfConnections, event)
// trying to find completed request and response
return NewHTTP1BufferAnalyzer(h).Analyze(halfConnections)
}
func (h *HTTP1Analyzer) insertToList(halfConnections *list.List, event *SocketDataUploadEvent) {
if halfConnections.Len() == 0 {
halfConnections.PushFront(event)
return
}
if halfConnections.Back().Value.(*SocketDataUploadEvent).DataID < event.DataID {
halfConnections.PushBack(event)
return
}
beenAdded := false
for element := halfConnections.Front(); element != nil; element = element.Next() {
existEvent := element.Value.(*SocketDataUploadEvent)
if existEvent.DataID > event.DataID {
// data id needs order
beenAdded = true
} else if existEvent.DataID == event.DataID {
if existEvent.MsgType == event.MsgType && existEvent.Sequence > event.Sequence {
// same message type and following the sequence order
beenAdded = true
} else if existEvent.MsgType > event.MsgType {
// request needs before response
beenAdded = true
}
}
if beenAdded {
halfConnections.InsertBefore(event, element)
break
}
}
if !beenAdded {
halfConnections.PushBack(event)
}
}
func (h *HTTP1Analyzer) analyze(_ Context, connectionID string, connectionMetrics *HTTP1ConnectionMetrics,
requestBuffer, responseBuffer SocketDataBuffer) error {
request, err := http.ReadRequest(bufio.NewReader(bytes.NewBuffer(requestBuffer.BufferData())))
if err != nil {
return fmt.Errorf("parse request failure: data length: %d, total data length: %d, %v",
len(requestBuffer.BufferData()), requestBuffer.TotalSize(), err)
}
response, err := http.ReadResponse(bufio.NewReader(bytes.NewBuffer(responseBuffer.BufferData())), request)
if response != nil {
defer response.Body.Close()
}
if err != nil {
if err == io.ErrUnexpectedEOF || err == io.EOF {
response, err = h.tryingToReadResponseWithoutHeaders(bufio.NewReader(bytes.NewBuffer(responseBuffer.BufferData())), request)
if err != nil {
return fmt.Errorf("parsing simple data error: %v", err)
}
if response != nil && response.Body != nil {
defer response.Body.Close()
}
}
if err != nil {
return fmt.Errorf("parse response failure, data length: %d, total data length: %d, %v",
len(requestBuffer.BufferData()), requestBuffer.TotalSize(), err)
}
}
// lock append metrics with read locker
connectionMetrics.metricsLocker.RLock()
defer connectionMetrics.metricsLocker.RUnlock()
// append metrics
combinedMetrics := connectionMetrics.combinedMetrics
h.appendToMetrics(combinedMetrics, request, requestBuffer, response, responseBuffer)
if log.Enable(logrus.DebugLevel) {
metricsJSON, _ := json.Marshal(combinedMetrics)
log.Debugf("generated metrics, connection id: %s, metrisc: %s", connectionID, string(metricsJSON))
}
return nil
}
func (h *HTTP1Analyzer) tryingToReadResponseWithoutHeaders(reader *bufio.Reader, request *http.Request) (*http.Response, error) {
if reader.Size() < 16 {
return nil, fmt.Errorf("the header length not enough")
}
tp := textproto.NewReader(reader)
resp := &http.Response{
Request: request,
}
line, err := tp.ReadLine()
if err != nil {
return nil, fmt.Errorf("read response first line failure: %v", err)
}
indexByte := strings.IndexByte(line, ' ')
if indexByte == -1 {
return nil, fmt.Errorf("parsing response error: %s", line)
}
resp.Proto = line[:indexByte]
resp.Status = strings.TrimLeft(line[indexByte+1:], " ")
statusCode := resp.Status
if i := strings.IndexByte(resp.Status, ' '); i != -1 {
statusCode = resp.Status[:i]
}
if len(statusCode) != 3 {
return nil, fmt.Errorf("parsing response status code failure: %v", statusCode)
}
resp.StatusCode, err = strconv.Atoi(statusCode)
if err != nil || resp.StatusCode < 0 {
return nil, fmt.Errorf("status code not correct: %s", statusCode)
}
var ok bool
if resp.ProtoMajor, resp.ProtoMinor, ok = http.ParseHTTPVersion(resp.Proto); !ok {
return nil, fmt.Errorf("parsing http version failure: %s", resp.Proto)
}
return resp, nil
}
func (h *HTTP1Analyzer) appendToMetrics(data *HTTP1URIMetrics, _ *http.Request, reqBuffer SocketDataBuffer,
resp *http.Response, respBuffer SocketDataBuffer) {
data.RequestCounter.Increase()
statusCounter := data.StatusCounter[resp.StatusCode]
if statusCounter == nil {
statusCounter = metrics.NewCounter()
data.StatusCounter[resp.StatusCode] = statusCounter
}
statusCounter.Increase()
data.AvgRequestPackageSize.Increase(float64(reqBuffer.TotalSize()))
data.AvgResponsePackageSize.Increase(float64(respBuffer.TotalSize()))
data.ReqPackageSizeHistogram.Increase(float64(reqBuffer.TotalSize()))
data.RespPackageSizeHistogram.Increase(float64(respBuffer.TotalSize()))
// duration data need client and server side
avgDuration := data.ClientAvgDuration
durationHistogram := data.ClientDurationHistogram
if reqBuffer.Direction() == base.SocketDataDirectionIngress {
// if the request is ingress, that's mean current is server side
avgDuration = data.ServerAvgDuration
durationHistogram = data.ServerDurationHistogram
}
duration := time.Duration(respBuffer.Time() - reqBuffer.Time())
durationInMS := float64(duration.Milliseconds())
avgDuration.Increase(durationInMS)
durationHistogram.Increase(durationInMS)
}
func (h *HTTP1ConnectionMetrics) MergeMetricsFromConnection(connection *base.ConnectionContext) {
other := QueryProtocolMetrics(connection.Metrics, HTTP1ProtocolName).(*HTTP1ConnectionMetrics)
other.metricsLocker.Lock()
defer other.metricsLocker.Unlock()
h.combinedMetrics.MergeAndClean(other.combinedMetrics)
if log.Enable(logrus.DebugLevel) {
marshal, _ := json.Marshal(h.combinedMetrics)
log.Debugf("combine metrics: conid: %d_%d, metrics: %s", connection.ConnectionID, connection.RandomID, marshal)
}
}
func (h *HTTP1ConnectionMetrics) FlushMetrics(traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
connectionMetrics := QueryProtocolMetrics(traffic.Metrics, HTTP1ProtocolName).(*HTTP1ConnectionMetrics)
for _, p := range traffic.LocalProcesses {
collection := make([]*v3.MeterData, 0)
combinedMetrics := connectionMetrics.combinedMetrics
collection = h.appendMetrics(collection, traffic, p, "", combinedMetrics, metricsBuilder)
if len(collection) == 0 {
continue
}
if log.Enable(logrus.DebugLevel) {
// if remote process is profiling, then the metrics data need to be cut half
log.Debugf("flush HTTP1 metrics(%s): %s, remote process is profiling: %t, "+
"client request count: %d, avg request size: %f, "+
"avg response size: %f, client avg duration: %f, server avg duration: %f",
traffic.Role.String(), traffic.GenerateConnectionInfo(), traffic.RemoteProcessIsProfiling(),
combinedMetrics.RequestCounter.Get(), combinedMetrics.AvgRequestPackageSize.Calculate(),
combinedMetrics.AvgResponsePackageSize.Calculate(),
combinedMetrics.ClientAvgDuration.Calculate(), combinedMetrics.ServerAvgDuration.Calculate())
}
metricsBuilder.AppendMetrics(p.Entity().ServiceName, p.Entity().InstanceName, collection)
}
}
func (h *HTTP1ConnectionMetrics) appendMetrics(collections []*v3.MeterData, traffic *base.ProcessTraffic,
local api.ProcessInterface, url string, http1Metrics *HTTP1URIMetrics, metricsBuilder *base.MetricsBuilder) []*v3.MeterData {
role, labels := metricsBuilder.BuildBasicMeterLabels(traffic, local)
prefix := metricsBuilder.MetricPrefix()
collections = h.buildMetrics(collections, prefix, "request_counter", labels, url, traffic,
h.cutHalfMetricsIfNeed(traffic, http1Metrics.RequestCounter))
for status, counter := range http1Metrics.StatusCounter {
statusLabels := append(labels, &v3.Label{Name: "code", Value: fmt.Sprintf("%d", status)})
collections = h.buildMetrics(collections, prefix, "response_status_counter", statusLabels, url, traffic,
h.cutHalfMetricsIfNeed(traffic, counter))
}
collections = h.buildMetrics(collections, prefix, "request_package_size_avg", labels, url, traffic, http1Metrics.AvgRequestPackageSize)
collections = h.buildMetrics(collections, prefix, "response_package_size_avg", labels, url, traffic, http1Metrics.AvgResponsePackageSize)
collections = h.buildMetrics(collections, prefix, "request_package_size_histogram", labels, url, traffic,
h.cutHalfMetricsIfNeed(traffic, http1Metrics.ReqPackageSizeHistogram))
collections = h.buildMetrics(collections, prefix, "response_package_size_histogram", labels, url, traffic,
h.cutHalfMetricsIfNeed(traffic, http1Metrics.RespPackageSizeHistogram))
avgDuration := http1Metrics.ClientAvgDuration
durationHistogram := http1Metrics.ClientDurationHistogram
if role == base.ConnectionRoleServer {
avgDuration = http1Metrics.ServerAvgDuration
durationHistogram = http1Metrics.ServerDurationHistogram
}
collections = h.buildMetrics(collections, prefix, fmt.Sprintf("%s_duration_avg", role.String()), labels, url,
traffic, avgDuration)
collections = h.buildMetrics(collections, prefix, fmt.Sprintf("%s_duration_histogram", role.String()), labels, url,
traffic, durationHistogram)
return collections
}
func (h *HTTP1ConnectionMetrics) cutHalfMetricsIfNeed(traffic *base.ProcessTraffic, data metrics.Metrics) metrics.Metrics {
if traffic.RemoteProcessIsProfiling() {
return data.CusHalfOfMetrics()
}
return data
}
func (h *HTTP1ConnectionMetrics) buildMetrics(collection []*v3.MeterData, prefix, name string, basicLabels []*v3.Label,
url string, traffic *base.ProcessTraffic, data metrics.Metrics) []*v3.MeterData {
// if remote process is also profiling, then needs to be calculated half of metrics
if traffic.RemoteProcessIsProfiling() {
data = data.CusHalfOfMetrics()
}
labels := basicLabels
var meterName string
if url != "" {
labels = append(labels, &v3.Label{Name: "url", Value: url})
meterName = fmt.Sprintf("%shttp1_%s_%s", prefix, "url", name)
} else {
meterName = fmt.Sprintf("%shttp1_%s", prefix, name)
}
return data.AppendMeter(collection, meterName, labels)
}
func (h *HTTP1ConnectionMetrics) MergeFrom(analyzer *HTTP1Analyzer, other *HTTP1ConnectionMetrics) {
if other.halfData != nil {
for element := other.halfData.Front(); element != nil; element = element.Next() {
analyzer.insertToList(h.halfData, element.Value.(*SocketDataUploadEvent))
}
}
}
func (u *HTTP1URIMetrics) MergeAndClean(other *HTTP1URIMetrics) {
u.RequestCounter.MergeAndClean(other.RequestCounter)
for k, v := range other.StatusCounter {
if existing := u.StatusCounter[k]; existing != nil {
existing.MergeAndClean(v)
} else {
u.StatusCounter[k] = v
}
}
u.AvgRequestPackageSize.MergeAndClean(other.AvgRequestPackageSize)
u.AvgResponsePackageSize.MergeAndClean(other.AvgResponsePackageSize)
u.ReqPackageSizeHistogram.MergeAndClean(other.ReqPackageSizeHistogram)
u.RespPackageSizeHistogram.MergeAndClean(other.RespPackageSizeHistogram)
u.ClientAvgDuration.MergeAndClean(other.ClientAvgDuration)
u.ServerAvgDuration.MergeAndClean(other.ServerAvgDuration)
u.ClientDurationHistogram.MergeAndClean(other.ClientDurationHistogram)
u.ServerDurationHistogram.MergeAndClean(other.ServerDurationHistogram)
}