blob: 8e96e395c78605bba009247aa38a34eb3cdaf9e8 [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package base
import (
"fmt"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/process/api"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events"
"github.com/apache/skywalking-rover/pkg/tools"
)
var log = logger.GetLogger("profiling", "task", "network", "analyze")
const (
layerMeshDP = "MESH_DP"
layerMeshApp = "MESH"
processEnvoy = "envoy"
)
type ProcessTraffic struct {
Analyzer *TrafficAnalyzer
// local process information
LocalIP string
LocalPort uint16
LocalPid uint32
LocalProcesses []api.ProcessInterface
// remote process/address information
RemoteIP string
RemotePort uint16
RemotePid uint32
RemoteProcesses []api.ProcessInterface
// connection basic information
Role events.ConnectionRole
Protocol events.ConnectionProtocol
IsSSL bool
// metrics
Metrics *ConnectionMetricsContext
}
type TrafficAnalyzer struct {
analyzeContext *AnalyzerContext
// used to find local same with remote address
// the connect request(local:a -> remote:b) same with accept address(remote:a -> local:b)
// key: localIP:port+RemoteIP+port
// value: localPid
localWithPeerCache map[LocalWithPeerAddress]*PidWithRole
// used to find only have the remote address connection
// the connect request(local:unknown -> remote:b), server side accept(local:b)
// key: RemoteIP:port
// value: remotePid list
peerAddressCache map[PeerAddress][]uint32
// used to find the envoy client, service connect to the service(outbound), but envoy accept the request through iptables
// the connect request(local:a -> remote:b(upstream service)), envoy side accept(local:c -> remote:a)
// key: LocalIP:port(service ip:port)
// value: remotePid(envoy)
envoyAcceptClientAddressCache map[PeerAddress]*AddressWithPid
// process data
// key: pid
// value: process entities
processData map[uint32][]api.ProcessInterface
// all local addresses(host only)
// key: ip
// value: [entity layer]process
localAddresses map[string]map[string]api.ProcessInterface
}
func (c *AnalyzerContext) NewTrafficAnalyzer() *TrafficAnalyzer {
return &TrafficAnalyzer{
analyzeContext: c,
localWithPeerCache: make(map[LocalWithPeerAddress]*PidWithRole),
peerAddressCache: make(map[PeerAddress][]uint32),
envoyAcceptClientAddressCache: make(map[PeerAddress]*AddressWithPid),
processData: make(map[uint32][]api.ProcessInterface),
localAddresses: make(map[string]map[string]api.ProcessInterface),
}
}
func (t *TrafficAnalyzer) CombineConnectionToTraffics(connections []*ConnectionContext) []*ProcessTraffic {
// build cache first
t.buildCache(connections)
// build traffics
pidMatchedTraffic := make(map[PidMatchTrafficKey]*ProcessTraffic)
pidToRemoteTraffic := make(map[PidToRemoteTrafficKey]*ProcessTraffic)
for _, con := range connections {
// find pid first
remotePid := t.findRemotePid(con)
if remotePid != 0 {
key := PidMatchTrafficKey{
LocalPid: con.LocalPid,
RemotePid: remotePid,
Role: con.Role,
}
traffic := pidMatchedTraffic[key]
pidMatchedTraffic[key] = t.generateOrCombineTraffic(traffic, con, remotePid)
traffic = pidMatchedTraffic[key]
log.Debugf("save pid match traffic[%d_%d_%d], %s:%d(%d)->%s:%d(%d), combine connection id: %d_%d",
con.LocalPid, remotePid, con.Role, traffic.LocalIP, traffic.LocalPort, traffic.LocalPid,
traffic.RemoteIP, traffic.RemotePort, traffic.RemotePid, con.ConnectionID, con.RandomID)
continue
}
// if the remote IP and Port is empty, then ignore it
if !t.ipNotEmpty(con.RemoteIP, con.RemotePort) {
continue
}
t.tryingToGenerateTheRoleWhenRemotePidCannotFound(con)
var pidToRemoteKey PidToRemoteTrafficKey
pidToRemoteKey.LocalPid = con.LocalPid
pidToRemoteKey.RemoteIP = con.RemoteIP
// if connection role is not server side, then add the remote port
if con.Role != events.ConnectionRoleServer {
// uniformly identified as a client
pidToRemoteKey.Role = events.ConnectionRoleClient
pidToRemoteKey.RemotePort = con.RemotePort
} else {
pidToRemoteKey.Role = events.ConnectionRoleServer
}
traffic := pidToRemoteTraffic[pidToRemoteKey]
traffic = t.generateOrCombineTraffic(traffic, con, 0)
pidToRemoteTraffic[pidToRemoteKey] = traffic
log.Debugf("save remote address traffic[%d_%d_%s_%d], %s:%d(%d)->%s:%d(%d)",
con.LocalPid, con.Role, con.RemoteIP, con.RemotePort, traffic.LocalIP, traffic.LocalPort,
traffic.LocalPid, traffic.RemoteIP, traffic.RemotePort, traffic.RemotePid)
}
// combine all result
result := make([]*ProcessTraffic, 0)
for _, v := range pidMatchedTraffic {
result = append(result, v)
}
for _, v := range pidToRemoteTraffic {
result = append(result, v)
}
return result
}
func (t *ProcessTraffic) GenerateConnectionInfo() string {
localInfo := fmt.Sprintf("%s:%d(%d)", t.LocalIP, t.LocalPort, t.LocalPid)
if len(t.LocalProcesses) > 0 {
localInfo = t.generateProcessInfo(t.LocalProcesses[0])
}
remoteInfo := fmt.Sprintf("%s:%d(%d)", t.RemoteIP, t.RemotePort, t.RemotePid)
if len(t.RemoteProcesses) > 0 {
remoteInfo = t.generateProcessInfo(t.RemoteProcesses[0])
}
return fmt.Sprintf("%s -> %s", localInfo, remoteInfo)
}
func (t *ProcessTraffic) generateProcessInfo(p api.ProcessInterface) string {
return fmt.Sprintf("(%s)%s:%s:%s(%s:%d)(%d)", p.Entity().Layer, p.Entity().ServiceName,
p.Entity().InstanceName, p.Entity().ProcessName, t.LocalIP, t.LocalPort, t.LocalPid)
}
func (t *ProcessTraffic) RemoteProcessIsProfiling() bool {
return len(t.RemoteProcesses) > 0
}
func (t *TrafficAnalyzer) tryingToGenerateTheRoleWhenRemotePidCannotFound(con *ConnectionContext) {
if con.Role != events.ConnectionRoleUnknown {
return
}
// local process address or process could not found, then could analyze the role
if con.LocalPort == 0 || len(con.LocalProcesses) == 0 {
return
}
var role events.ConnectionRole
// if port is expose, and remote address is not local pid
// then the role of connection is server side usually
if con.LocalProcesses[0].PortIsExpose(int(con.LocalPort)) {
role = events.ConnectionRoleServer
} else {
role = events.ConnectionRoleClient
}
con.Role = role
log.Debugf("found current connection role is unknown, analyzed role is %s through local port. %s:%d(%d)->%s:%d",
role.String(), con.LocalIP, con.LocalPort, con.LocalPid, con.RemoteIP, con.RemotePort)
}
func (t *TrafficAnalyzer) generateOrCombineTraffic(traffic *ProcessTraffic, con *ConnectionContext, remotePid uint32) *ProcessTraffic {
if traffic == nil {
traffic = &ProcessTraffic{
Analyzer: t,
LocalPid: con.LocalPid,
LocalProcesses: con.LocalProcesses,
LocalIP: con.LocalIP,
LocalPort: con.LocalPort,
Metrics: t.analyzeContext.NewConnectionMetrics(),
}
}
if len(traffic.LocalProcesses) == 0 && len(con.LocalProcesses) > 0 {
traffic.LocalProcesses = con.LocalProcesses
}
if traffic.Role == events.ConnectionRoleUnknown && con.Role != events.ConnectionRoleUnknown {
traffic.Role = con.Role
}
if traffic.Protocol == events.ConnectionProtocolUnknown && con.Protocol != events.ConnectionProtocolUnknown {
traffic.Protocol = con.Protocol
}
if !traffic.IsSSL && con.IsSSL {
traffic.IsSSL = true
}
if remotePid != 0 {
traffic.RemotePid = remotePid
traffic.RemoteProcesses = t.processData[remotePid]
}
traffic.RemoteIP = con.RemoteIP
traffic.RemotePort = con.RemotePort
// flush connection metrics
traffic.Metrics.MergeMetricsFromConnection(con)
con.FlushDataCount++
return traffic
}
func (t *TrafficAnalyzer) IsLocalAddressInCache(ip string) bool {
return len(t.localAddresses[ip]) > 0
}
func (t *TrafficAnalyzer) buildCache(connections []*ConnectionContext) {
for _, con := range connections {
if t.ipNotEmpty(con.LocalIP, con.LocalPort) && t.ipNotEmpty(con.RemoteIP, con.RemotePort) {
t.localWithPeerCache[LocalWithPeerAddress{
LocalIP: con.LocalIP,
LocalPort: con.LocalPort,
RemoteIP: con.RemoteIP,
RemotePort: con.RemotePort,
}] = &PidWithRole{
Pid: con.LocalPid,
Role: con.Role,
}
}
if t.ipNotEmpty(con.LocalIP, con.LocalPort) {
peerAddress := PeerAddress{
RemoteIP: con.LocalIP,
RemotePort: con.LocalPort,
}
t.peerAddressCache[peerAddress] = append(t.peerAddressCache[peerAddress], con.LocalPid)
if len(con.LocalProcesses) > 0 {
localAddressProcesses := t.localAddresses[con.LocalIP]
if len(localAddressProcesses) == 0 {
localAddressProcesses = make(map[string]api.ProcessInterface)
t.localAddresses[con.LocalIP] = localAddressProcesses
}
for _, p := range con.LocalProcesses {
localAddressProcesses[p.Entity().Layer] = p
}
}
} else if t.ipNotEmpty(con.RemoteIP, con.RemotePort) {
// if server side is envoy
if con.Role == events.ConnectionRoleServer && len(con.LocalProcesses) > 0 {
name, err := con.LocalProcesses[0].ExeName()
if err != nil {
log.Warnf("get process exe name failure, pid: %d, error: %v", con.LocalPid, err)
}
if name == processEnvoy {
t.envoyAcceptClientAddressCache[PeerAddress{
RemoteIP: con.RemoteIP,
RemotePort: con.RemotePort,
}] = &AddressWithPid{
RemoteIP: con.LocalIP,
RemotePort: con.LocalPort,
Pid: con.LocalPid,
}
}
}
}
if len(t.processData[con.LocalPid]) == 0 {
t.processData[con.LocalPid] = con.LocalProcesses
}
t.processExportPortAnalyze(con)
}
}
func (t *TrafficAnalyzer) processExportPortAnalyze(con *ConnectionContext) {
// if the process exists, role of connection is server mode and local port is exists1
// add the detected port into the processes
if len(con.LocalProcesses) > 0 && con.Role == events.ConnectionRoleServer && con.LocalPort > 0 {
for _, p := range con.LocalProcesses {
p.DetectNewExposePort(int(con.LocalPort))
}
}
}
func (t *TrafficAnalyzer) findRemotePid(con *ConnectionContext) uint32 {
// full address
if pid := t.findRemotePidWhenContainsFullAddress(con); pid > 0 {
return pid
}
// only remote address
if pid := t.findRemotePidWhenContainsRemoteAddress(con); pid > 0 {
return pid
}
// mesh environment
if pid := t.findRemotePidWhenMeshEnvironment(con); pid > 0 {
return pid
}
return 0
}
func (t *TrafficAnalyzer) findRemotePidWhenContainsFullAddress(con *ConnectionContext) uint32 {
// match to localWithPeerCache
if t.ipNotEmpty(con.LocalIP, con.LocalPort) && t.ipNotEmpty(con.RemoteIP, con.RemotePort) {
data := t.localWithPeerCache[LocalWithPeerAddress{
LocalIP: con.RemoteIP,
LocalPort: con.RemotePort,
RemoteIP: con.LocalIP,
RemotePort: con.LocalPort,
}]
if data != nil {
log.Debugf("found in peer cache: %s:%d->%s:%d, pid: %d", con.RemoteIP, con.RemotePort, con.LocalIP, con.LocalPort, data.Pid)
// if current connection is unknown, but peer network has role, then just use the revert role
// such as: cur:(a->b) unknown, remote:(b->a) client, then current connection must have the server role
if con.Role == events.ConnectionRoleUnknown && data.Role != events.ConnectionRoleUnknown {
con.Role = data.Role.Revert()
}
return data.Pid
}
log.Debugf("not found in peer cache: %s:%d->%s:%d", con.RemoteIP, con.RemotePort, con.LocalIP, con.LocalPort)
// if current role is client side, and localIP:port match to envoy
if con.Role == events.ConnectionRoleClient {
// need update the remote address to real address
addr := t.envoyAcceptClientAddressCache[PeerAddress{
RemoteIP: con.LocalIP,
RemotePort: con.LocalPort,
}]
if addr != nil {
if t.ipNotEmpty(addr.RemoteIP, addr.RemotePort) {
con.RemoteIP = addr.RemoteIP
con.RemotePort = addr.RemotePort
}
log.Debugf("found envoy connection: %s:%d->%s:%d", con.LocalIP, con.LocalPort, con.RemoteIP, con.RemotePort)
return addr.Pid
}
log.Debugf("not envoy connection: %s:%d->%s:%d", con.LocalIP, con.LocalPort, con.RemoteIP, con.RemotePort)
}
}
return 0
}
func (t *TrafficAnalyzer) findRemotePidWhenContainsRemoteAddress(con *ConnectionContext) uint32 {
if !t.ipNotEmpty(con.RemoteIP, con.RemotePort) {
return 0
}
// use non-strict verification, don't verify the client role, ensure that pid is available to be greatest extent
// because the information of role maybe missing when not trigger the connect/accept
pidCaches := t.peerAddressCache[PeerAddress{
RemoteIP: con.RemoteIP,
RemotePort: con.RemotePort,
}]
if len(pidCaches) > 0 {
result := pidCaches[0]
// when the remote peer address contains multiple pid
// the process usually not self
for _, pid := range pidCaches {
if pid != con.LocalPid {
result = pid
}
}
log.Debugf("found remote address by peer address cache: %s:%d -> %d", con.RemoteIP, con.RemotePort, result)
return result
}
return 0
}
func (t *TrafficAnalyzer) findRemotePidWhenMeshEnvironment(con *ConnectionContext) uint32 {
// special handle for mesh application, when it could not match the process through address
if len(con.LocalProcesses) == 0 || !t.ipNotEmpty(con.RemoteIP, con.RemotePort) {
return 0
}
for _, localProcess := range con.LocalProcesses {
// match when the MESH data plane not found the MESH application
if localProcess.Entity().Layer == layerMeshDP {
addresses := t.localAddresses[con.RemoteIP]
if len(addresses) == 0 {
continue
}
if p := addresses[layerMeshApp]; p != nil {
log.Debugf("found in the mesh application, remote ip: %s", con.RemoteIP)
return uint32(p.Pid())
}
continue
}
// if current is mesh application, them it's must be sent to the MESH_DP
if localProcess.Entity().Layer == layerMeshApp &&
len(t.localAddresses[con.RemoteIP]) == 0 && !tools.IsLocalHostAddress(con.RemoteIP) {
if envoyPid := t.findSameInstanceMeshDP(localProcess.Entity()); envoyPid != 0 {
log.Debugf("found in the mesh data plane, remote ip: %s", con.RemoteIP)
return envoyPid
}
}
}
return 0
}
func (t *TrafficAnalyzer) findSameInstanceMeshDP(entity *api.ProcessEntity) uint32 {
for _, psList := range t.analyzeContext.processes {
for _, p := range psList {
if p.Entity().Layer == layerMeshDP && p.Entity().ServiceName == entity.ServiceName && p.Entity().InstanceName == entity.InstanceName {
name, err := p.ExeName()
if err != nil {
log.Warnf("query the process execute file name failure: %d, error: %v", p.Pid(), err)
continue
}
if name == processEnvoy {
return uint32(p.Pid())
}
}
}
}
return 0
}
type LocalWithPeerAddress struct {
LocalIP string
LocalPort uint16
RemoteIP string
RemotePort uint16
}
type PeerAddress struct {
RemoteIP string
RemotePort uint16
}
type AddressWithPid struct {
RemoteIP string
RemotePort uint16
Pid uint32
}
type PidMatchTrafficKey struct {
LocalPid uint32
RemotePid uint32
Role events.ConnectionRole
}
type PidToRemoteTrafficKey struct {
LocalPid uint32
Role events.ConnectionRole
RemoteIP string
RemotePort uint16
}
type PidWithRole struct {
Pid uint32
Role events.ConnectionRole
}
func (t *TrafficAnalyzer) ipNotEmpty(ip string, port uint16) bool {
return ip != "" && port != 0
}