blob: 231ac0b3c4e597efbc3b56fead89b8d579ae2b7a [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package layer4
import (
"context"
"fmt"
"net"
"sync"
"unsafe"
"github.com/sirupsen/logrus"
"github.com/apache/skywalking-rover/pkg/module"
"github.com/apache/skywalking-rover/pkg/process/api"
profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
"golang.org/x/sys/unix"
)
var Name = "layer4"
type Listener struct {
// socket retransmit/drop
socketExceptionStatics map[SocketBasicKey]*SocketExceptionValue
socketExceptionOperationLock sync.Mutex
}
func NewListener() *Listener {
return &Listener{
socketExceptionStatics: make(map[SocketBasicKey]*SocketExceptionValue),
}
}
func (l *Listener) Name() string {
return Name
}
func (l *Listener) Init(config *profiling.TaskConfig, moduleManager *module.Manager) error {
return nil
}
func (l *Listener) GenerateMetrics() base.ConnectionMetrics {
return NewLayer4Metrics()
}
func (l *Listener) RegisterBPFEvents(ctx context.Context, bpfLoader *bpf.Loader) {
bpfLoader.ReadEventAsync(bpfLoader.SocketExceptionOperationEventQueue, l.handleSocketExceptionOperationEvent, func() interface{} {
return &SocketExceptionOperationEvent{}
})
}
func (l *Listener) ReceiveNewConnection(ctx *base.ConnectionContext, event *base.SocketConnectEvent) {
// update the connection execute time
l.getMetrics(ctx.Metrics).ConnectExecuteTime = event.ExeTime
}
func (l *Listener) ReceiveCloseConnection(ctx *base.ConnectionContext, event *base.SocketCloseEvent) {
layer4 := l.getMetrics(ctx.Metrics)
// data transmit counters
layer4.WriteCounter.UpdateToCurrent(event.WriteBytes, event.WriteCount, event.WriteExeTime)
layer4.ReadCounter.UpdateToCurrent(event.ReadBytes, event.ReadCount, event.ReadExeTime)
layer4.WriteRTTCounter.UpdateToCurrent(0, uint64(event.WriteRTTCount), uint64(event.WriteRTTExeTime))
// connection close execute time
layer4.CloseExecuteTime = event.ExeTime
}
func (l *Listener) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
}
func (l *Listener) PreFlushConnectionMetrics(ccs []*base.ConnectionWithBPF, bpfLoader *bpf.Loader) error {
// rebuild to the map for helping quick search correlate ConnectionContext
keyWithContext := make(map[string]*base.ConnectionContext)
for _, cc := range ccs {
// ready to flush histograms
connection := cc.Connection
layer4 := l.getMetrics(connection.Metrics)
// basic counter update
activeConnection := cc.ActiveInBPF
if activeConnection != nil {
layer4.WriteCounter.UpdateToCurrent(activeConnection.WriteBytes, activeConnection.WriteCount, activeConnection.WriteExeTime)
layer4.ReadCounter.UpdateToCurrent(activeConnection.ReadBytes, activeConnection.ReadCount, activeConnection.ReadExeTime)
layer4.WriteRTTCounter.UpdateToCurrent(0, activeConnection.WriteRTTCount, activeConnection.WriteRTTExeTime)
}
// build cache
keyWithContext[l.generateConID(connection.ConnectionID, connection.RandomID)] = connection
if log.Enable(logrus.DebugLevel) {
log.Debugf("found connection: %d, %s relation: %s:%d(%d) -> %s:%d, protocol: %s, is_ssl: %t, is_closed: %t, write: %d bytes/%d, read: %d bytes/%d",
connection.ConnectionID, connection.Role.String(),
connection.LocalIP, connection.LocalPort, connection.LocalPid, connection.RemoteIP, connection.RemotePort,
connection.Protocol.String(), connection.IsSSL, connection.ConnectionClosed, layer4.WriteCounter.Cur.Bytes,
layer4.WriteCounter.Cur.Count, layer4.ReadCounter.Cur.Bytes, layer4.ReadCounter.Cur.Count)
}
}
var key HistogramDataKey
var count uint32
histogramIt := bpfLoader.SocketConnectionStatsHistogram.Iterate()
// for-each the stats map
for histogramIt.Next(&key, &count) {
// if it's not relate to the ConnectionContext just ignore
cc := keyWithContext[l.generateConID(key.ConnectionID, key.RandomID)]
if cc == nil {
continue
}
layer4 := l.getMetrics(cc.Metrics)
// add the histogram data
var histogram *SocketDataHistogramWithHistory
if key.DataDirection == base.SocketDataDirectionEgress {
if key.DataType == base.SocketDataStaticsTypeExeTime {
histogram = layer4.WriteExeTimeHistogram
} else if key.DataType == base.SocketDataStaticsTypeRTT {
histogram = layer4.WriteRTTHistogram
}
} else if key.DataDirection == base.SocketDataDirectionIngress {
histogram = layer4.ReadExeTimeHistogram
}
if histogram == nil {
log.Warnf("unknown the histogram data: %v", cc)
continue
}
histogram.UpdateToCurrent(key.Bucket, count)
// delete the stats if the connection already closed
if cc.ConnectionClosed {
if err := bpfLoader.SocketConnectionStatsHistogram.Delete(key); err != nil {
log.Warnf("delete the connection stats failure: %v", err)
}
}
}
// all the exception operations to the context
exceptionContexts := l.cleanAndGetAllExceptionContexts()
l.combineExceptionToConnections(keyWithContext, exceptionContexts)
return nil
}
func (l *Listener) PostFlushConnectionMetrics(ccs []*base.ConnectionContext) {
for _, connection := range ccs {
metrics := l.getMetrics(connection.Metrics)
// refresh counters
metrics.WriteCounter.RefreshCurrent()
metrics.ReadCounter.RefreshCurrent()
metrics.WriteRTTCounter.RefreshCurrent()
metrics.WriteRTTHistogram.RefreshCurrent()
metrics.WriteExeTimeHistogram.RefreshCurrent()
metrics.ReadExeTimeHistogram.RefreshCurrent()
metrics.ConnectCounter.RefreshCurrent()
metrics.CloseCounter.RefreshCurrent()
metrics.ConnectExeTimeHistogram.RefreshCurrent()
metrics.CloseExeTimeHistogram.RefreshCurrent()
metrics.RetransmitCounter.RefreshCurrent()
metrics.DropCounter.RefreshCurrent()
}
}
func (l *Listener) FlushMetrics(traffics []*base.ProcessTraffic, builder *base.MetricsBuilder) {
l.logTheMetricsConnections(traffics)
metricsPrefix := builder.MetricPrefix()
for _, traffic := range traffics {
metrics := traffic.Metrics.GetMetrics(Name).(*Metrics)
for _, p := range traffic.LocalProcesses {
collection := make([]*v3.MeterData, 0)
collection = l.appendCounterValues(collection, metricsPrefix, "write", traffic, p, metrics.WriteCounter, builder)
collection = l.appendCounterValues(collection, metricsPrefix, "read", traffic, p, metrics.ReadCounter, builder)
collection = l.appendCounterValues(collection, metricsPrefix, "write_rtt", traffic, p, metrics.WriteRTTCounter, builder)
collection = l.appendCounterValues(collection, metricsPrefix, "connect", traffic, p, metrics.ConnectCounter, builder)
collection = l.appendCounterValues(collection, metricsPrefix, "close", traffic, p, metrics.CloseCounter, builder)
collection = l.appendCounterValues(collection, metricsPrefix, "retransmit", traffic, p, metrics.RetransmitCounter, builder)
collection = l.appendCounterValues(collection, metricsPrefix, "drop", traffic, p, metrics.DropCounter, builder)
collection = l.appendHistogramValue(collection, metricsPrefix, "write_rtt", traffic, p, metrics.WriteRTTHistogram, builder)
collection = l.appendHistogramValue(collection, metricsPrefix, "write_exe_time", traffic, p, metrics.WriteExeTimeHistogram, builder)
collection = l.appendHistogramValue(collection, metricsPrefix, "read_exe_time", traffic, p, metrics.ReadExeTimeHistogram, builder)
collection = l.appendHistogramValue(collection, metricsPrefix, "connect_exe_time", traffic, p, metrics.ConnectExeTimeHistogram, builder)
collection = l.appendHistogramValue(collection, metricsPrefix, "close_exe_time", traffic, p, metrics.CloseExeTimeHistogram, builder)
if len(collection) == 0 {
continue
}
builder.AppendMetrics(p.Entity().ServiceName, p.Entity().InstanceName, collection)
}
}
}
func (l *Listener) logTheMetricsConnections(traffics []*base.ProcessTraffic) {
if !log.Enable(logrus.DebugLevel) {
return
}
for _, traffic := range traffics {
side := traffic.Role.String()
layer4 := l.getMetrics(traffic.Metrics)
log.Debugf("connection layer4 analyze result: %s : %s, protocol: %s, is SSL: %t, write: %d bytes/%d, read: %d bytes/%d",
side, traffic.GenerateConnectionInfo(), traffic.Protocol.String(), traffic.IsSSL, layer4.WriteCounter.Cur.Bytes, layer4.WriteCounter.Cur.Count,
layer4.ReadCounter.Cur.Bytes, layer4.ReadCounter.Cur.Count)
}
}
func (l *Listener) generateConID(conID, randomID uint64) string {
return fmt.Sprintf("%d_%d", conID, randomID)
}
func (l *Listener) cleanAndGetAllExceptionContexts() map[SocketBasicKey]*SocketExceptionValue {
l.socketExceptionOperationLock.Lock()
defer l.socketExceptionOperationLock.Unlock()
result := l.socketExceptionStatics
l.socketExceptionStatics = make(map[SocketBasicKey]*SocketExceptionValue)
return result
}
func (l *Listener) combineExceptionToConnections(ccs map[string]*base.ConnectionContext, exps map[SocketBasicKey]*SocketExceptionValue) {
for key, value := range exps {
var remotePort, localPort = uint16(key.RemotePort), uint16(key.LocalPort)
var remoteIP, localIP string
if key.Family == unix.AF_INET {
remoteIP = parseAddressV4(key.RemoteAddrV4)
localIP = parseAddressV4(key.LocalAddrV4)
} else if key.Family == unix.AF_INET6 {
remoteIP = parseAddressV6(key.RemoteAddrV6)
localIP = parseAddressV6(key.LocalAddrV6)
} else {
continue
}
var firstRemoteMatch *base.ConnectionContext
var foundAllAddrMatch bool
for _, cc := range ccs {
// only add to the first matches
if cc.RemoteIP == remoteIP && cc.RemotePort == remotePort {
firstRemoteMatch = cc
if cc.LocalIP == localIP && cc.LocalPort == localPort {
l.mergeExceptionToAppointConnection(value, cc)
foundAllAddrMatch = true
break
}
}
}
// if only remote address match, then just add to the first one
if !foundAllAddrMatch && firstRemoteMatch != nil {
l.mergeExceptionToAppointConnection(value, firstRemoteMatch)
}
}
}
func (l *Listener) mergeExceptionToAppointConnection(expCtx *SocketExceptionValue, conCtx *base.ConnectionContext) {
layer4 := l.getMetrics(conCtx.Metrics)
layer4.DropCounter.IncreaseToCurrent(NewSocketDataCounterWithValue(0, uint64(expCtx.DropCount), 0))
layer4.RetransmitCounter.IncreaseToCurrent(NewSocketDataCounterWithValue(0, uint64(expCtx.RetransmitCount), 0))
}
func (l *Listener) appendCounterValues(metrics []*v3.MeterData, metricsPrefix, name string, traffic *base.ProcessTraffic,
local api.ProcessInterface, counter *SocketDataCounterWithHistory, builder *base.MetricsBuilder) []*v3.MeterData {
metric := counter.Cur
if !metric.NotEmpty() {
return metrics
}
count := float64(metric.Count)
metrics = append(metrics, l.buildSingleValue(metricsPrefix, name+"_counts_counter", traffic, local, count, builder))
if metric.Bytes > 0 {
metrics = append(metrics, l.buildSingleValue(metricsPrefix, name+"_bytes_counter", traffic, local, float64(metric.Bytes), builder))
}
if metric.ExeTime > 0 {
metrics = append(metrics, l.buildSingleValue(metricsPrefix, name+"_exe_time_counter", traffic, local, float64(metric.ExeTime)/count, builder))
}
return metrics
}
func (l *Listener) appendHistogramValue(metrics []*v3.MeterData, metricsPrefix, name string, traffic *base.ProcessTraffic,
local api.ProcessInterface, histogram *SocketDataHistogramWithHistory, metricsBuilder *base.MetricsBuilder) []*v3.MeterData {
data := histogram.Cur
if !data.NotEmpty() {
return metrics
}
role, labels := metricsBuilder.BuildBasicMeterLabels(traffic, local)
values := make([]*v3.MeterBucketValue, 0)
for bucket, count := range data.Buckets {
var bucketInx = int(bucket)
if bucketInx >= SocketHistogramBucketsCount {
bucketInx = SocketHistogramBucketsCount - 1
}
var buckets []float64
if data.Unit == HistogramDataUnitUS {
buckets = SocketHistogramBucketsUs
} else {
buckets = SocketHistogramBucketsNs
}
values = append(values, &v3.MeterBucketValue{
Bucket: buckets[bucketInx],
Count: int64(count),
})
}
return append(metrics, &v3.MeterData{
Metric: &v3.MeterData_Histogram{
Histogram: &v3.MeterHistogram{
Name: fmt.Sprintf("%s%s_%s_histogram", metricsPrefix, role.String(), name),
Labels: labels,
Values: values,
},
},
})
}
func (l *Listener) buildSingleValue(prefix, name string, traffic *base.ProcessTraffic, local api.ProcessInterface, val float64,
metricBuilder *base.MetricsBuilder) *v3.MeterData {
role, labels := metricBuilder.BuildBasicMeterLabels(traffic, local)
return &v3.MeterData{
Metric: &v3.MeterData_SingleValue{
SingleValue: &v3.MeterSingleValue{
Name: fmt.Sprintf("%s%s_%s", prefix, role.String(), name),
Labels: labels,
Value: val,
},
},
}
}
func (l *Listener) getMetrics(connectionMetrics *base.ConnectionMetricsContext) *Metrics {
return connectionMetrics.GetMetrics(Name).(*Metrics)
}
type HistogramDataKey struct {
ConnectionID uint64
RandomID uint64
Bucket uint64
DataDirection base.SocketDataDirection
DataType base.SocketDataStaticsType
Fix [6]byte
}
func parseAddressV4(val uint32) string {
return net.IP((*(*[net.IPv4len]byte)(unsafe.Pointer(&val)))[:]).String()
}
func parseAddressV6(val [16]uint8) string {
return net.IP((*(*[net.IPv6len]byte)(unsafe.Pointer(&val)))[:]).String()
}