blob: c39d690d2ecf20a1a3174d5cae270cd413f85118 [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package network
import (
"context"
"fmt"
"io"
"sync"
"time"
"github.com/cilium/ebpf"
"github.com/hashicorp/go-multierror"
"github.com/cilium/ebpf/link"
"github.com/apache/skywalking-rover/pkg/core"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/module"
"github.com/apache/skywalking-rover/pkg/process/api"
"github.com/apache/skywalking-rover/pkg/profiling/task/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze"
analyzeBase "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
)
var log = logger.GetLogger("profiling", "task", "network")
type Runner struct {
initOnce sync.Once
startLock sync.Mutex
stopOnce sync.Once
meterClient v3.MeterReportServiceClient
reportInterval time.Duration
meterPrefix string
bpf *bpf.Loader
processes map[int32][]api.ProcessInterface
analyzeContext *analyzeBase.AnalyzerContext
ctx context.Context
cancel context.CancelFunc
}
func NewGlobalRunnerContext() *Runner {
processes := make(map[int32][]api.ProcessInterface)
return &Runner{
processes: processes,
analyzeContext: analyze.NewContext(processes),
}
}
func (r *Runner) init(config *base.TaskConfig, moduleMgr *module.Manager) error {
var err error
r.initOnce.Do(func() {
err = r.init0(config, moduleMgr)
})
return err
}
func (r *Runner) DeleteProcesses(processes []api.ProcessInterface) (bool, error) {
var err error
for _, p := range processes {
pid := p.Pid()
existsProcesses := make([]api.ProcessInterface, 0)
existsProcesses = append(existsProcesses, r.processes[pid]...)
// update process entities
newProcesses := make([]api.ProcessInterface, 0)
for _, existProcess := range existsProcesses {
if p.ID() != existProcess.ID() {
newProcesses = append(newProcesses, existProcess)
}
}
// no process need delete, then just ignore
if len(newProcesses) == len(existsProcesses) {
continue
}
// the process no need to monitor, then just ignore
if len(newProcesses) == 0 {
if err1 := r.bpf.ProcessMonitorControl.Delete(uint32(pid)); err1 != nil {
err = multierror.Append(err, err1)
}
log.Debugf("delete monitor process: %d", pid)
delete(r.processes, pid)
continue
}
r.processes[pid] = newProcesses
}
return len(r.processes) == 0, err
}
func (r *Runner) Start(ctx context.Context, processes []api.ProcessInterface) error {
r.startLock.Lock()
defer r.startLock.Unlock()
// if already start, then just adding the processes
if r.bpf != nil {
return r.addProcesses(processes)
}
r.ctx, r.cancel = context.WithCancel(ctx)
// load bpf program
bpfLoader, err := bpf.NewLoader()
if err != nil {
return err
}
r.bpf = bpfLoader
if err := r.addProcesses(processes); err != nil {
return err
}
// register all handlers
r.analyzeContext.RegisterAllHandlers(r.ctx, bpfLoader)
r.analyzeContext.StartSocketAddressParser(r.ctx)
// sock opts
bpfLoader.AddSysCall("close", bpfLoader.SysClose, bpfLoader.SysCloseRet)
bpfLoader.AddSysCall("connect", bpfLoader.SysConnect, bpfLoader.SysConnectRet)
bpfLoader.AddSysCall("accept", bpfLoader.SysAccept, bpfLoader.SysAcceptRet)
bpfLoader.AddSysCall("accept4", bpfLoader.SysAccept, bpfLoader.SysAcceptRet)
bpfLoader.AddLink(link.Kretprobe, bpfLoader.SockAllocRet, "sock_alloc")
bpfLoader.AddLink(link.Kprobe, bpfLoader.TcpConnect, "tcp_connect")
// write/receive data
bpfLoader.AddSysCall("send", bpfLoader.SysSend, bpfLoader.SysSendRet)
bpfLoader.AddSysCall("sendto", bpfLoader.SysSendto, bpfLoader.SysSendtoRet)
bpfLoader.AddSysCall("sendmsg", bpfLoader.SysSendmsg, bpfLoader.SysSendmsgRet)
bpfLoader.AddSysCall("sendmmsg", bpfLoader.SysSendmmsg, bpfLoader.SysSendmmsgRet)
bpfLoader.AddSysCall("sendfile", bpfLoader.SysSendfile, bpfLoader.SysSendfileRet)
bpfLoader.AddSysCall("sendfile64", bpfLoader.SysSendfile, bpfLoader.SysSendfileRet)
bpfLoader.AddSysCall("write", bpfLoader.SysWrite, bpfLoader.SysWriteRet)
bpfLoader.AddSysCall("writev", bpfLoader.SysWritev, bpfLoader.SysWritevRet)
bpfLoader.AddSysCall("read", bpfLoader.SysRead, bpfLoader.SysReadRet)
bpfLoader.AddSysCall("readv", bpfLoader.SysReadv, bpfLoader.SysReadvRet)
bpfLoader.AddSysCall("recv", bpfLoader.SysRecv, bpfLoader.SysRecvRet)
bpfLoader.AddSysCall("recvfrom", bpfLoader.SysRecvfrom, bpfLoader.SysRecvfromRet)
bpfLoader.AddSysCall("recvmsg", bpfLoader.SysRecvmsg, bpfLoader.SysRecvmsgRet)
bpfLoader.AddSysCall("recvmmsg", bpfLoader.SysRecvmmsg, bpfLoader.SysRecvmmsgRet)
bpfLoader.AddLink(link.Kprobe, bpfLoader.TcpRcvEstablished, "tcp_rcv_established")
bpfLoader.AddLink(link.Kprobe, bpfLoader.SecuritySocketSendmsg, "security_socket_sendmsg")
bpfLoader.AddLink(link.Kprobe, bpfLoader.SecuritySocketRecvmsg, "security_socket_recvmsg")
// retransmit/drop
bpfLoader.AddLink(link.Kprobe, bpfLoader.TcpRetransmit, "tcp_retransmit_skb")
bpfLoader.AddLink(link.Kprobe, bpfLoader.TcpDrop, "tcp_drop")
if err := bpfLoader.HasError(); err != nil {
_ = bpfLoader.Close()
return err
}
// metrics report
r.registerMetricsReport()
return nil
}
func (r *Runner) registerMetricsReport() {
go func() {
timeTicker := time.NewTicker(r.reportInterval)
for {
select {
case <-timeTicker.C:
if err := r.flushMetrics(); err != nil {
log.Errorf("flush network monitoing metrics failure: %v", err)
}
case <-r.ctx.Done():
timeTicker.Stop()
return
}
}
}()
}
func (r *Runner) flushMetrics() error {
// flush all metrics
metricsBuilder, err := r.analyzeContext.FlushAllMetrics(r.bpf, r.meterPrefix)
if err != nil {
return err
}
metrics := metricsBuilder.Build()
// send metrics
batch, err := r.meterClient.CollectBatch(r.ctx)
if err != nil {
return err
}
defer func() {
if _, e := batch.CloseAndRecv(); e != nil {
log.Warnf("close the metrics stream error: %v", e)
}
}()
count := 0
for _, m := range metrics {
count += len(m.MeterData)
if err := batch.Send(m); err != nil {
return err
}
}
if count > 0 {
log.Infof("total send network topology meter data: %d", count)
}
return nil
}
func (r *Runner) Stop() error {
// if starting, then need to wait start finished
r.startLock.Lock()
defer r.startLock.Unlock()
if r.cancel != nil {
r.cancel()
}
var result error
r.stopOnce.Do(func() {
result = r.closeWhenExists(result, r.bpf)
})
return result
}
func (r *Runner) closeWhenExists(err error, c io.Closer) error {
if c == nil {
return err
}
if e := c.Close(); e != nil {
err = multierror.Append(err, e)
}
return err
}
func (r *Runner) init0(config *base.TaskConfig, moduleMgr *module.Manager) error {
coreOperator := moduleMgr.FindModule(core.ModuleName).(core.Operator)
connection := coreOperator.BackendOperator().GetConnection()
r.meterClient = v3.NewMeterReportServiceClient(connection)
reportInterval, err := time.ParseDuration(config.Network.ReportInterval)
if err != nil {
return fmt.Errorf("parsing report interval failure: %v", err)
}
r.reportInterval = reportInterval
if config.Network.MeterPrefix == "" {
return fmt.Errorf("please provide the meter prefix")
}
r.meterPrefix = config.Network.MeterPrefix + "_"
err = r.analyzeContext.Init(config, moduleMgr)
if err != nil {
return fmt.Errorf("init analyzer failure: %v", err)
}
return nil
}
func (r *Runner) addProcesses(processes []api.ProcessInterface) error {
var err error
for _, p := range processes {
pid := p.Pid()
alreadyExists := false
if len(r.processes[pid]) > 0 {
for _, existsProcess := range r.processes[pid] {
if p.ID() == existsProcess.ID() {
alreadyExists = true
break
}
}
}
if alreadyExists {
continue
}
r.processes[pid] = append(r.processes[pid], p)
// add to the process let it could be monitored
if err1 := r.bpf.ProcessMonitorControl.Update(uint32(pid), uint32(1), ebpf.UpdateAny); err1 != nil {
err = multierror.Append(err, err1)
}
// add process ssl config
if err1 := addSSLProcess(int(pid), r.bpf); err1 != nil {
err = multierror.Append(err, err1)
}
log.Debugf("add monitor process, pid: %d", pid)
}
return err
}