blob: 4f490bcfc6325c376d5c8c4b197f6d2e601fdf28 [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//go:build linux
package oncpu
import (
v3 ""
// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
// nolint
//go:generate go run -target bpfel -cc $BPF_CLANG -cflags $BPF_CFLAGS bpf $REPO_ROOT/bpf/profiling/oncpu.c -- -I$REPO_ROOT/bpf/include
var log = logger.GetLogger("profiling", "task", "oncpu")
type Event struct {
UserStackID uint32
KernelStackID uint32
type Runner struct {
base *base.Runner
pid int32
processProfiling *profiling.Info
kernelProfiling *profiling.Info
dumpFrequency int64
// runtime
perfEventFds []int
bpf *bpfObjects
stackCounter map[Event]uint32
flushDataNotify context.CancelFunc
stopChan chan bool
func NewRunner(config *base.TaskConfig) (base.ProfileTaskRunner, error) {
if config.OnCPU.Period == "" {
return nil, fmt.Errorf("please provide the ON_CPU dump period")
dumpPeriod, err := time.ParseDuration(config.OnCPU.Period)
if err != nil {
return nil, fmt.Errorf("the ON_CPU dump period format not right, current value: %s", config.OnCPU.Period)
if dumpPeriod < time.Millisecond {
return nil, fmt.Errorf("the ON_CPU dump period could not be smaller than 1ms")
return &Runner{
base: base.NewBaseRunner(),
dumpFrequency: time.Second.Milliseconds() / dumpPeriod.Milliseconds(),
}, nil
func (r *Runner) Init(task *base.ProfilingTask, process api.ProcessInterface) error { = process.Pid()
// process profiling stat
if r.processProfiling = process.ProfilingStat(); r.processProfiling == nil {
return fmt.Errorf("this process could not be profiling")
// kernel profiling stat
kernelProfiling, err := tools.KernelFileProfilingStat()
if err != nil {
log.Warnf("could not analyze kernel profiling stats: %v", err)
r.kernelProfiling = kernelProfiling
r.stackCounter = make(map[Event]uint32)
r.stopChan = make(chan bool, 1)
return nil
func (r *Runner) Run(ctx context.Context, notify base.ProfilingRunningSuccessNotify) error {
// load bpf
objs := bpfObjects{}
spec, err := loadBpf()
if err != nil {
return err
// update the monitor pid
funcName := "do_perf_event"
replacedPid := false
for i, ins := range spec.Programs[funcName].Instructions {
if ins.Reference == "MONITOR_PID" {
spec.Programs[funcName].Instructions[i].Constant = int64(
spec.Programs[funcName].Instructions[i].Offset = 0
replacedPid = true
if !replacedPid {
return fmt.Errorf("replace the monitor pid failure")
if err1 := spec.LoadAndAssign(&objs, nil); err1 != nil {
log.Fatalf("loading objects: %s", err1)
defer objs.Close()
r.bpf = &objs
// opened perf events
perfEvents, err := r.openPerfEvent(objs.DoPerfEvent.FD())
r.perfEventFds = perfEvents
if err != nil {
return err
// notify start success
runtime.SetFinalizer(r, (*Runner).Stop)
return nil
func (r *Runner) openPerfEvent(perfFd int) ([]int, error) {
eventAttr := &unix.PerfEventAttr{
Bits: unix.PerfBitFreq,
Sample: uint64(r.dumpFrequency),
Wakeup: 1,
fds := make([]int, 0)
for cpuNum := 0; cpuNum < runtime.NumCPU(); cpuNum++ {
fd, err := unix.PerfEventOpen(
if err != nil {
return fds, err
// attach ebpf to perf event
if err := unix.IoctlSetInt(fd, unix.PERF_EVENT_IOC_SET_BPF, perfFd); err != nil {
return fds, err
// enable perf event
if err := unix.IoctlSetInt(fd, unix.PERF_EVENT_IOC_ENABLE, 0); err != nil {
return fds, err
fds = append(fds, fd)
return fds, nil
func (r *Runner) Stop() error {
var result error
r.base.ShutdownOnce.Do(func() {
for _, fd := range r.perfEventFds {
if err := r.closePerfEvent(fd); err != nil {
result = multierror.Append(result, err)
// wait for all profiling data been consume finished
cancel, cancelFunc := context.WithCancel(context.Background())
r.flushDataNotify = cancelFunc
select {
case <-cancel.Done():
case <-time.After(5 * time.Second):
if r.bpf != nil {
if err := r.bpf.Close(); err != nil {
result = multierror.Append(result, err)
return result
func (r *Runner) FlushData() ([]*v3.EBPFProfilingData, error) {
var stack Event
var counter uint32
iterate := r.bpf.Counts.Iterate()
stacks := r.bpf.Stacks
result := make([]*v3.EBPFProfilingData, 0)
stackSymbols := make([]uint64, 100)
for iterate.Next(&stack, &counter) {
metadatas := make([]*v3.EBPFProfilingStackMetadata, 0)
// kernel stack
if d := r.base.GenerateProfilingData(r.kernelProfiling, stack.KernelStackID, stacks,
v3.EBPFProfilingStackType_PROCESS_KERNEL_SPACE, stackSymbols); d != nil {
metadatas = append(metadatas, d)
// user stack
if d := r.base.GenerateProfilingData(r.processProfiling, stack.UserStackID, stacks,
v3.EBPFProfilingStackType_PROCESS_USER_SPACE, stackSymbols); d != nil {
metadatas = append(metadatas, d)
if len(metadatas) == 0 {
// update the counters in memory
dumpCount := int32(counter)
existCounter := r.stackCounter[stack]
if existCounter > 0 {
dumpCount -= int32(existCounter)
r.stackCounter[stack] = counter
if dumpCount <= 0 {
result = append(result, &v3.EBPFProfilingData{
Profiling: &v3.EBPFProfilingData_OnCPU{
OnCPU: &v3.EBPFOnCPUProfiling{
Stacks: metadatas,
DumpCount: dumpCount,
// close the flush data notify if exists
if r.flushDataNotify != nil {
return result, nil
func (r *Runner) closePerfEvent(fd int) error {
if fd <= 0 {
return nil
var result error
if err := unix.IoctlSetInt(fd, unix.PERF_EVENT_IOC_DISABLE, 0); err != nil {
result = multierror.Append(result, fmt.Errorf("closing perf event reader: %s", err))
return result