blob: be5c1686363e11c39bf53a7b0e061fe694319017 [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package collector
import (
"context"
"fmt"
"strings"
"time"
"k8s.io/apimachinery/pkg/util/cache"
"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/accesslog/events"
"github.com/apache/skywalking-rover/pkg/module"
"github.com/apache/skywalking-rover/pkg/tools/elf"
"github.com/apache/skywalking-rover/pkg/tools/enums"
"github.com/apache/skywalking-rover/pkg/tools/host"
"github.com/apache/skywalking-rover/pkg/tools/ip"
v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
"github.com/shirou/gopsutil/process"
)
var (
// ZTunnelProcessFinderInterval is the interval to find ztunnel process
ZTunnelProcessFinderInterval = time.Second * 30
// ZTunnelTrackBoundSymbolPrefix is the prefix of the symbol name to track outbound connections in ztunnel process
// ztunnel::proxy::connection_manager::ConnectionManager::track_outbound
ZTunnelTrackBoundSymbolPrefix = "_ZN7ztunnel5proxy18connection_manager17ConnectionManager14track_outbound"
)
var zTunnelCollectInstance = NewZTunnelCollector(time.Minute)
// ZTunnelCollector is a collector for ztunnel process in the Ambient Istio scenario
type ZTunnelCollector struct {
ctx context.Context
cancel context.CancelFunc
alc *common.AccessLogContext
collectingProcess *process.Process
ipMappingCache *cache.Expiring
ipMappingExpireDuration time.Duration
}
func NewZTunnelCollector(expireTime time.Duration) *ZTunnelCollector {
return &ZTunnelCollector{
ipMappingCache: cache.NewExpiring(),
ipMappingExpireDuration: expireTime,
}
}
func (z *ZTunnelCollector) Start(_ *module.Manager, ctx *common.AccessLogContext) error {
z.ctx, z.cancel = context.WithCancel(ctx.RuntimeContext)
z.alc = ctx
ctx.ConnectionMgr.RegisterNewFlushListener(z)
err := z.findZTunnelProcessAndCollect()
if err != nil {
return err
}
if z.collectingProcess == nil {
return nil
}
ctx.BPF.ReadEventAsync(ctx.BPF.ZtunnelLbSocketMappingEventQueue, func(data interface{}) {
event := data.(*events.ZTunnelSocketMappingEvent)
localIP := z.convertBPFIPToString(event.OriginalSrcIP)
localPort := event.OriginalSrcPort
remoteIP := z.convertBPFIPToString(event.OriginalDestIP)
remotePort := event.OriginalDestPort
lbIP := z.convertBPFIPToString(event.LoadBalancedDestIP)
log.Debugf("received ztunnel lb socket mapping event: %s:%d -> %s:%d, lb: %s", localIP, localPort, remoteIP, remotePort, lbIP)
key := z.buildIPMappingCacheKey(localIP, int(localPort), remoteIP, int(remotePort))
z.ipMappingCache.Set(key, &ZTunnelLoadBalanceAddress{
IP: lbIP,
Port: event.LoadBalancedDestPort,
From: v3.ZTunnelAttachmentEnvironmentDetectBy_ZTUNNEL_OUTBOUND_FUNC,
}, z.ipMappingExpireDuration)
}, func() interface{} {
return &events.ZTunnelSocketMappingEvent{}
})
go func() {
ticker := time.NewTicker(ZTunnelProcessFinderInterval)
for {
select {
case <-ticker.C:
err := z.findZTunnelProcessAndCollect()
if err != nil {
log.Error("failed to find and collect ztunnel process: ", err)
}
case <-z.ctx.Done():
ticker.Stop()
return
}
}
}()
return nil
}
func (z *ZTunnelCollector) OnConnectEvent(e *events.SocketConnectEvent, s *ip.SocketPair) bool {
if z.collectingProcess != nil && e != nil && s != nil && uint32(z.collectingProcess.Pid) == e.PID &&
s.Role == enums.ConnectionRoleClient {
// must be the client side(outbound) connect
// revert the source and dest for the workload application accept
key := z.buildIPMappingCacheKey(s.DestIP, int(s.DestPort), s.SrcIP, int(s.SrcPort))
z.ipMappingCache.Set(key, &ZTunnelLoadBalanceAddress{
From: v3.ZTunnelAttachmentEnvironmentDetectBy_ZTUNNEL_INBOUND_FUNC,
}, z.ipMappingExpireDuration)
log.Debugf("found the ztunnel outbound connection, "+
"connection ID: %d, randomID: %d, pid: %d, fd: %d, role: %s, local: %s:%d, remote: %s:%d",
e.ConID, e.RandomID, e.PID, e.SocketFD, enums.ConnectionRole(e.Role), s.SrcIP, s.SrcPort, s.DestIP, s.DestPort)
return false
}
return true
}
func (z *ZTunnelCollector) ReadyToFlushConnection(connection *common.ConnectionInfo, _ events.Event) {
if connection == nil || connection.Socket == nil || connection.RPCConnection == nil || connection.RPCConnection.Attachment != nil ||
z.ipMappingCache.Len() == 0 {
return
}
key := z.buildIPMappingCacheKey(connection.Socket.SrcIP, int(connection.Socket.SrcPort),
connection.Socket.DestIP, int(connection.Socket.DestPort))
lbIPObj, found := z.ipMappingCache.Get(key)
if !found {
log.Debugf("there no ztunnel mapped IP address found for connection ID: %d, random ID: %d",
connection.ConnectionID, connection.RandomID)
return
}
address := lbIPObj.(*ZTunnelLoadBalanceAddress)
log.Debugf("found the ztunnel load balanced IP for the connection: %s, connectionID: %d, randomID: %d",
address.String(), connection.ConnectionID, connection.RandomID)
securityPolicy := v3.ZTunnelAttachmentSecurityPolicy_NONE
// if the target port is 15008, this mean ztunnel have use mTLS
if address.From == v3.ZTunnelAttachmentEnvironmentDetectBy_ZTUNNEL_OUTBOUND_FUNC && address.Port == 15008 {
securityPolicy = v3.ZTunnelAttachmentSecurityPolicy_MTLS
}
connection.RPCConnection.Attachment = &v3.ConnectionAttachment{
Environment: &v3.ConnectionAttachment_ZTunnel{
ZTunnel: &v3.ZTunnelAttachmentEnvironment{
RealDestinationIp: address.IP,
By: address.From,
SecurityPolicy: securityPolicy,
},
},
}
}
func (z *ZTunnelCollector) convertBPFIPToString(ipAddr uint32) string {
return fmt.Sprintf("%d.%d.%d.%d", ipAddr>>24, ipAddr>>16&0xff, ipAddr>>8&0xff, ipAddr&0xff)
}
func (z *ZTunnelCollector) buildIPMappingCacheKey(localIP string, localPort int, remoteIP string, remotePort int) string {
return fmt.Sprintf("%s:%d-%s:%d", localIP, localPort, remoteIP, remotePort)
}
func (z *ZTunnelCollector) Stop() {
if z.cancel != nil {
z.cancel()
}
}
func (z *ZTunnelCollector) findZTunnelProcessAndCollect() error {
if z.collectingProcess != nil {
running, err := z.collectingProcess.IsRunning()
if err == nil && running {
// already collecting the process
log.Debugf("found the ztunnel process and collecting ztunnel data from pid: %d", z.collectingProcess.Pid)
return nil
}
log.Warnf("detected ztunnel process is not running, should re-scan process to find and collect it")
}
processes, err := process.Processes()
if err != nil {
return err
}
var zTunnelProcess *process.Process
for _, p := range processes {
name, err := p.Exe()
if err != nil {
continue
}
if strings.HasSuffix(name, "/ztunnel") {
zTunnelProcess = p
break
}
}
if zTunnelProcess == nil {
log.Debugf("ztunnel process not found is current node")
return nil
}
log.Infof("ztunnel process founded in current node, pid: %d", zTunnelProcess.Pid)
z.collectingProcess = zTunnelProcess
return z.collectZTunnelProcess(zTunnelProcess)
}
func (z *ZTunnelCollector) collectZTunnelProcess(p *process.Process) error {
pidExeFile := host.GetHostProcInHost(fmt.Sprintf("%d/exe", p.Pid))
elfFile, err := elf.NewFile(pidExeFile)
if err != nil {
return fmt.Errorf("read executable file error: %v", err)
}
trackBoundSymbol := elfFile.FilterSymbol(func(name string) bool {
return strings.HasPrefix(name, ZTunnelTrackBoundSymbolPrefix)
}, true)
if len(trackBoundSymbol) == 0 {
return fmt.Errorf("failed to find track outbound symbol in ztunnel process")
}
uprobeFile := z.alc.BPF.OpenUProbeExeFile(pidExeFile)
uprobeFile.AddLink(trackBoundSymbol[0].Name, z.alc.BPF.ConnectionManagerTrackOutbound, nil)
// setting the ztunnel pid in the BPF
if err = z.alc.BPF.ZtunnelProcessPid.Set(p.Pid); err != nil {
return fmt.Errorf("failed to set ztunnel process pid in the BPF: %v", err)
}
return nil
}
type ZTunnelLoadBalanceAddress struct {
IP string
Port uint16
From v3.ZTunnelAttachmentEnvironmentDetectBy
}
func (z *ZTunnelLoadBalanceAddress) String() string {
return fmt.Sprintf("%s:%d(%s)", z.IP, z.Port, z.From)
}