blob: 3514d024b947630a9c048035bcae2837eb7b20ab [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package base
import (
"fmt"
"time"
"github.com/apache/skywalking-rover/pkg/process/api"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events"
"github.com/apache/skywalking-rover/pkg/tools"
agentv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
logv3 "skywalking.apache.org/repo/goapi/collect/logging/v3"
)
// ConnectionMetrics The Metrics in each listener
type ConnectionMetrics interface {
// MergeMetricsFromConnection merge the metrics from connection, and added into self
MergeMetricsFromConnection(connection *ConnectionContext, data ConnectionMetrics)
}
type ConnectionMetricsContext struct {
data map[string]ConnectionMetrics
}
func (c *AnalyzerContext) NewConnectionMetrics() *ConnectionMetricsContext {
data := make(map[string]ConnectionMetrics)
for _, l := range c.listeners {
data[l.Name()] = l.GenerateMetrics()
}
return &ConnectionMetricsContext{data: data}
}
func (c *ConnectionMetricsContext) GetMetrics(listenerName string) ConnectionMetrics {
return c.data[listenerName]
}
func (c *ConnectionMetricsContext) MergeMetricsFromConnection(connection *ConnectionContext) {
for name, metric := range c.data {
metrics := connection.Metrics.GetMetrics(name)
metric.MergeMetricsFromConnection(connection, metrics)
}
}
type MetricsBuilder struct {
prefix string
metrics map[metadata][]*agentv3.MeterData
logs map[metadata][]*logv3.LogData
events []*agentv3.SpanAttachedEvent
}
func NewMetricsBuilder(prefix string) *MetricsBuilder {
return &MetricsBuilder{
prefix: prefix,
metrics: make(map[metadata][]*agentv3.MeterData),
logs: make(map[metadata][]*logv3.LogData),
}
}
func (m *MetricsBuilder) AppendMetrics(service, instance string, metrics []*agentv3.MeterData) {
meta := metadata{Layer: "", ServiceName: service, InstanceName: instance}
existingMetrics := m.metrics[meta]
if len(existingMetrics) == 0 {
m.metrics[meta] = metrics
return
}
m.metrics[meta] = append(existingMetrics, metrics...)
}
func (m *MetricsBuilder) AppendLogs(service string, log *logv3.LogData) {
meta := metadata{ServiceName: service}
m.logs[meta] = append(m.logs[meta], log)
}
func (m *MetricsBuilder) AppendSpanAttachedEvents(attachedEvents []*agentv3.SpanAttachedEvent) {
m.events = append(m.events, attachedEvents...)
}
func (m *MetricsBuilder) MetricPrefix() string {
return m.prefix
}
func (m *MetricsBuilder) BuildBasicMeterLabels(traffic *ProcessTraffic, local api.ProcessInterface) (events.ConnectionRole, []*agentv3.Label) {
curRole := traffic.Role
// add the default role
if curRole == events.ConnectionRoleUnknown {
curRole = events.ConnectionRoleClient
}
labels := make([]*agentv3.Label, 0)
// two pair process/address info
labels = m.appendMeterValue(labels, fmt.Sprintf("%s_process_id", curRole.String()), local.ID())
labels = m.appendRemoteAddressInfo(labels, traffic, curRole.Revert().String(), local)
labels = m.appendMeterValue(labels, "side", curRole.String())
// protocol and ssl
labels = m.appendMeterValue(labels, "protocol", traffic.Protocol.String())
labels = m.appendMeterValue(labels, "is_ssl", fmt.Sprintf("%t", traffic.IsSSL))
return curRole, labels
}
func (m *MetricsBuilder) BuildMetrics() []*agentv3.MeterDataCollection {
collections := make([]*agentv3.MeterDataCollection, 0)
now := time.Now().UnixMilli()
for meta, meters := range m.metrics {
if len(meters) == 0 {
continue
}
meters[0].Service = meta.ServiceName
meters[0].ServiceInstance = meta.InstanceName
meters[0].Timestamp = now
collections = append(collections, &agentv3.MeterDataCollection{MeterData: meters})
}
return collections
}
func (m *MetricsBuilder) BuildLogs() [][]*logv3.LogData {
result := make([][]*logv3.LogData, 0)
now := time.Now().UnixMilli()
for meta, logs := range m.logs {
if len(logs) == 0 {
continue
}
logs[0].Service = meta.ServiceName
// update the timestamp
for _, l := range logs {
l.Timestamp = now
}
result = append(result, logs)
}
return result
}
func (m *MetricsBuilder) BuildEvents() []*agentv3.SpanAttachedEvent {
return m.events
}
type metadata struct {
Layer string
ServiceName string
InstanceName string
}
func (m *MetricsBuilder) appendRemoteAddressInfo(labels []*agentv3.Label, traffic *ProcessTraffic, prefix string,
local api.ProcessInterface) []*agentv3.Label {
if len(traffic.RemoteProcesses) != 0 {
for _, p := range traffic.RemoteProcesses {
// only match with same service instance
if local.Entity().ServiceName == p.Entity().ServiceName &&
local.Entity().InstanceName == p.Entity().InstanceName {
return m.appendMeterValue(labels, prefix+"_process_id", p.ID())
}
}
}
if tools.IsLocalHostAddress(traffic.RemoteIP) || traffic.Analyzer.IsLocalAddressInCache(traffic.RemoteIP) {
return m.appendMeterValue(labels, prefix+"_local", "true")
}
return m.appendMeterValue(labels, prefix+"_address", fmt.Sprintf("%s:%d", traffic.RemoteIP, traffic.RemotePort))
}
func (m *MetricsBuilder) appendMeterValue(labels []*agentv3.Label, name, value string) []*agentv3.Label {
return append(labels, &agentv3.Label{
Name: name,
Value: value,
})
}