blob: 8c836898f05226851f89dd87bac7f0322aa7bca7 [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package core
import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"sync/atomic"
"time"
"github.com/apache/skywalking-go/plugins/core/operator"
"github.com/apache/skywalking-go/plugins/core/reporter"
)
const (
// Pprof event types
PprofEventsTypeCPU = "cpu"
PprofEventsTypeHeap = "heap"
PprofEventsTypeAllocs = "allocs"
PprofEventsTypeBlock = "block"
PprofEventsTypeMutex = "mutex"
PprofEventsTypeThread = "threadcreate"
PprofEventsTypeGoroutine = "goroutine"
)
// CPU profiling state to ensure only one CPU profiling task runs at a time
var profilingIsRunning atomic.Bool
func init() {
reporter.NewPprofTaskCommand = NewPprofTaskCommand
}
type PprofTaskCommandImpl struct {
// Pprof Task ID
taskID string
// Type of profiling (CPU/Heap/Block/Mutex/Goroutine/Threadcreate/Allocs)
events string
// Unit is minute, required for CPU, Block and Mutex events
duration time.Duration
// Unix timestamp in milliseconds when the task was created
createTime int64
// Define the period of the pprof dump, required for Block and Mutex events
dumpPeriod int
// for pprof task service
pprofFilePath string
logger operator.LogOperator
manager reporter.PprofReporter
}
func NewPprofTaskCommand(taskID, events string, duration time.Duration,
createTime int64, dumpPeriod int, pprofFilePath string,
logger operator.LogOperator, manager reporter.PprofReporter) reporter.PprofTaskCommand {
return &PprofTaskCommandImpl{
taskID: taskID,
events: events,
duration: duration,
createTime: createTime,
dumpPeriod: dumpPeriod,
pprofFilePath: pprofFilePath,
logger: logger,
manager: manager,
}
}
func (c *PprofTaskCommandImpl) GetTaskID() string {
return c.taskID
}
func (c *PprofTaskCommandImpl) GetCreateTime() int64 {
return c.createTime
}
func (c *PprofTaskCommandImpl) GetDuration() time.Duration {
return c.duration
}
func (c *PprofTaskCommandImpl) GetDumpPeriod() int {
return c.dumpPeriod
}
func (c *PprofTaskCommandImpl) IsInvalidEvent() bool {
return !(c.events == PprofEventsTypeHeap ||
c.events == PprofEventsTypeAllocs ||
c.events == PprofEventsTypeGoroutine ||
c.events == PprofEventsTypeThread ||
c.events == PprofEventsTypeCPU ||
c.events == PprofEventsTypeBlock ||
c.events == PprofEventsTypeMutex)
}
func (c *PprofTaskCommandImpl) IsDirectSamplingType() bool {
return c.events == PprofEventsTypeHeap ||
c.events == PprofEventsTypeAllocs ||
c.events == PprofEventsTypeGoroutine ||
c.events == PprofEventsTypeThread
}
func (c *PprofTaskCommandImpl) HasDumpPeriod() bool {
return c.events == PprofEventsTypeBlock ||
c.events == PprofEventsTypeMutex
}
func (c *PprofTaskCommandImpl) closeFileWriter(writer io.Writer) {
if file, ok := writer.(*os.File); ok {
if err := file.Close(); err != nil {
c.logger.Errorf("failed to close pprof file: %v", err)
}
}
}
func (c *PprofTaskCommandImpl) getWriter() (io.Writer, error) {
// sample data to buffer
if c.pprofFilePath == "" {
return &bytes.Buffer{}, nil
}
// sample data to file
pprofFileName := filepath.Join(c.taskID, ".pprof")
pprofFilePath := filepath.Join(c.pprofFilePath, pprofFileName)
if err := os.MkdirAll(filepath.Dir(pprofFilePath), os.ModePerm); err != nil {
return nil, err
}
writer, err := os.Create(pprofFilePath)
if err != nil {
return nil, err
}
return writer, nil
}
func (c *PprofTaskCommandImpl) StartTask() (io.Writer, error) {
c.logger.Infof("start pprof task %s", c.taskID)
// For CPU profiling, check global state first
if c.events == PprofEventsTypeCPU && !profilingIsRunning.CompareAndSwap(false, true) {
return nil, fmt.Errorf("CPU profiling is already running")
}
writer, err := c.getWriter()
if err != nil {
if c.events == PprofEventsTypeCPU {
profilingIsRunning.Store(false)
}
return nil, err
}
switch c.events {
case PprofEventsTypeCPU:
if err = pprof.StartCPUProfile(writer); err != nil {
profilingIsRunning.Store(false)
if c.pprofFilePath != "" {
c.closeFileWriter(writer)
}
return nil, err
}
case PprofEventsTypeBlock:
runtime.SetBlockProfileRate(c.dumpPeriod)
case PprofEventsTypeMutex:
runtime.SetMutexProfileFraction(c.dumpPeriod)
}
return writer, nil
}
func (c *PprofTaskCommandImpl) StopTask(writer io.Writer) {
c.logger.Infof("stop pprof task %s", c.taskID)
switch c.events {
case PprofEventsTypeCPU:
pprof.StopCPUProfile()
profilingIsRunning.Store(false)
case PprofEventsTypeBlock:
if err := pprof.Lookup("block").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Block profile error %v", err)
}
runtime.SetBlockProfileRate(0)
case PprofEventsTypeMutex:
if err := pprof.Lookup("mutex").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Mutex profile error %v", err)
}
runtime.SetMutexProfileFraction(0)
case PprofEventsTypeHeap:
if err := pprof.Lookup("heap").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Heap profile error %v", err)
}
case PprofEventsTypeAllocs:
if err := pprof.Lookup("allocs").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Alloc profile error %v", err)
}
case PprofEventsTypeGoroutine:
if err := pprof.Lookup("goroutine").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Goroutine profile error %v", err)
}
case PprofEventsTypeThread:
if err := pprof.Lookup("threadcreate").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Thread profile error %v", err)
}
}
if c.pprofFilePath != "" {
c.closeFileWriter(writer)
}
c.readPprofData(c.taskID, writer)
}
func (c *PprofTaskCommandImpl) readPprofData(taskID string, writer io.Writer) {
var data []byte
if c.pprofFilePath == "" {
if buf, ok := writer.(*bytes.Buffer); ok {
data = buf.Bytes()
}
} else {
if file, ok := writer.(*os.File); ok {
filePath := file.Name()
fileData, err := os.ReadFile(filePath)
if err != nil {
c.logger.Errorf("failed to read pprof file %s: %v", filePath, err)
}
data = fileData
if err := os.Remove(filePath); err != nil {
c.logger.Errorf("failed to remove pprof file %s: %v", filePath, err)
}
}
}
c.manager.ReportPprof(taskID, data)
}