Support pprof profiling (#232)
diff --git a/CHANGES.md b/CHANGES.md
index 10524a6..0825750 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,6 +11,7 @@
* Add recover to goroutine to prevent unexpected panics.
* Add mutex to fix some data race.
* Replace external `goapi` dependency with in-repo generated protocols.
+* Support pprof profiling.
#### Plugins
#### Documentation
diff --git a/agent/core/compile.go b/agent/core/compile.go
index c511ac0..baefdb1 100644
--- a/agent/core/compile.go
+++ b/agent/core/compile.go
@@ -19,17 +19,21 @@
import (
//go:nolint
+ _ "bytes"
_ "encoding/base64"
_ "fmt"
+ _ "io"
_ "log"
_ "math"
_ "math/rand"
_ "net"
_ "os"
+ _ "path/filepath"
_ "reflect"
_ "runtime"
_ "runtime/debug"
_ "runtime/metrics"
+ _ "runtime/pprof"
_ "sort"
_ "strconv"
_ "strings"
@@ -55,4 +59,5 @@
_ "github.com/apache/skywalking-go/protocols/collect/language/agent/v3"
_ "github.com/apache/skywalking-go/protocols/collect/language/profile/v3"
_ "github.com/apache/skywalking-go/protocols/collect/logging/v3"
+ _ "github.com/apache/skywalking-go/protocols/collect/pprof/v10"
)
diff --git a/agent/reporter/imports.go b/agent/reporter/imports.go
index e1c6fd6..90c7e8a 100644
--- a/agent/reporter/imports.go
+++ b/agent/reporter/imports.go
@@ -71,5 +71,6 @@
_ "github.com/apache/skywalking-go/protocols/collect/language/profile/v3"
_ "github.com/apache/skywalking-go/protocols/collect/logging/v3"
_ "github.com/apache/skywalking-go/protocols/collect/management/v3"
+ _ "github.com/apache/skywalking-go/protocols/collect/pprof/v10"
_ "github.com/apache/skywalking-go/protocols/collect/servicemesh/v3"
)
diff --git a/plugins/core/pprof.go b/plugins/core/pprof.go
new file mode 100644
index 0000000..8c83689
--- /dev/null
+++ b/plugins/core/pprof.go
@@ -0,0 +1,246 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+ "runtime"
+ "runtime/pprof"
+ "sync/atomic"
+ "time"
+
+ "github.com/apache/skywalking-go/plugins/core/operator"
+ "github.com/apache/skywalking-go/plugins/core/reporter"
+)
+
+const (
+ // Pprof event types
+ PprofEventsTypeCPU = "cpu"
+ PprofEventsTypeHeap = "heap"
+ PprofEventsTypeAllocs = "allocs"
+ PprofEventsTypeBlock = "block"
+ PprofEventsTypeMutex = "mutex"
+ PprofEventsTypeThread = "threadcreate"
+ PprofEventsTypeGoroutine = "goroutine"
+)
+
+// CPU profiling state to ensure only one CPU profiling task runs at a time
+var profilingIsRunning atomic.Bool
+
+func init() {
+ reporter.NewPprofTaskCommand = NewPprofTaskCommand
+}
+
+type PprofTaskCommandImpl struct {
+ // Pprof Task ID
+ taskID string
+ // Type of profiling (CPU/Heap/Block/Mutex/Goroutine/Threadcreate/Allocs)
+ events string
+ // Unit is minute, required for CPU, Block and Mutex events
+ duration time.Duration
+ // Unix timestamp in milliseconds when the task was created
+ createTime int64
+ // Define the period of the pprof dump, required for Block and Mutex events
+ dumpPeriod int
+
+ // for pprof task service
+ pprofFilePath string
+ logger operator.LogOperator
+ manager reporter.PprofReporter
+}
+
+func NewPprofTaskCommand(taskID, events string, duration time.Duration,
+ createTime int64, dumpPeriod int, pprofFilePath string,
+ logger operator.LogOperator, manager reporter.PprofReporter) reporter.PprofTaskCommand {
+ return &PprofTaskCommandImpl{
+ taskID: taskID,
+ events: events,
+ duration: duration,
+ createTime: createTime,
+ dumpPeriod: dumpPeriod,
+ pprofFilePath: pprofFilePath,
+ logger: logger,
+ manager: manager,
+ }
+}
+
+func (c *PprofTaskCommandImpl) GetTaskID() string {
+ return c.taskID
+}
+
+func (c *PprofTaskCommandImpl) GetCreateTime() int64 {
+ return c.createTime
+}
+
+func (c *PprofTaskCommandImpl) GetDuration() time.Duration {
+ return c.duration
+}
+
+func (c *PprofTaskCommandImpl) GetDumpPeriod() int {
+ return c.dumpPeriod
+}
+
+func (c *PprofTaskCommandImpl) IsInvalidEvent() bool {
+ return !(c.events == PprofEventsTypeHeap ||
+ c.events == PprofEventsTypeAllocs ||
+ c.events == PprofEventsTypeGoroutine ||
+ c.events == PprofEventsTypeThread ||
+ c.events == PprofEventsTypeCPU ||
+ c.events == PprofEventsTypeBlock ||
+ c.events == PprofEventsTypeMutex)
+}
+
+func (c *PprofTaskCommandImpl) IsDirectSamplingType() bool {
+ return c.events == PprofEventsTypeHeap ||
+ c.events == PprofEventsTypeAllocs ||
+ c.events == PprofEventsTypeGoroutine ||
+ c.events == PprofEventsTypeThread
+}
+
+func (c *PprofTaskCommandImpl) HasDumpPeriod() bool {
+ return c.events == PprofEventsTypeBlock ||
+ c.events == PprofEventsTypeMutex
+}
+
+func (c *PprofTaskCommandImpl) closeFileWriter(writer io.Writer) {
+ if file, ok := writer.(*os.File); ok {
+ if err := file.Close(); err != nil {
+ c.logger.Errorf("failed to close pprof file: %v", err)
+ }
+ }
+}
+
+func (c *PprofTaskCommandImpl) getWriter() (io.Writer, error) {
+ // sample data to buffer
+ if c.pprofFilePath == "" {
+ return &bytes.Buffer{}, nil
+ }
+
+ // sample data to file
+ pprofFileName := filepath.Join(c.taskID, ".pprof")
+ pprofFilePath := filepath.Join(c.pprofFilePath, pprofFileName)
+ if err := os.MkdirAll(filepath.Dir(pprofFilePath), os.ModePerm); err != nil {
+ return nil, err
+ }
+
+ writer, err := os.Create(pprofFilePath)
+ if err != nil {
+ return nil, err
+ }
+
+ return writer, nil
+}
+
+func (c *PprofTaskCommandImpl) StartTask() (io.Writer, error) {
+ c.logger.Infof("start pprof task %s", c.taskID)
+ // For CPU profiling, check global state first
+ if c.events == PprofEventsTypeCPU && !profilingIsRunning.CompareAndSwap(false, true) {
+ return nil, fmt.Errorf("CPU profiling is already running")
+ }
+
+ writer, err := c.getWriter()
+ if err != nil {
+ if c.events == PprofEventsTypeCPU {
+ profilingIsRunning.Store(false)
+ }
+ return nil, err
+ }
+
+ switch c.events {
+ case PprofEventsTypeCPU:
+ if err = pprof.StartCPUProfile(writer); err != nil {
+ profilingIsRunning.Store(false)
+ if c.pprofFilePath != "" {
+ c.closeFileWriter(writer)
+ }
+ return nil, err
+ }
+ case PprofEventsTypeBlock:
+ runtime.SetBlockProfileRate(c.dumpPeriod)
+ case PprofEventsTypeMutex:
+ runtime.SetMutexProfileFraction(c.dumpPeriod)
+ }
+
+ return writer, nil
+}
+
+func (c *PprofTaskCommandImpl) StopTask(writer io.Writer) {
+ c.logger.Infof("stop pprof task %s", c.taskID)
+ switch c.events {
+ case PprofEventsTypeCPU:
+ pprof.StopCPUProfile()
+ profilingIsRunning.Store(false)
+ case PprofEventsTypeBlock:
+ if err := pprof.Lookup("block").WriteTo(writer, 0); err != nil {
+ c.logger.Errorf("write Block profile error %v", err)
+ }
+ runtime.SetBlockProfileRate(0)
+ case PprofEventsTypeMutex:
+ if err := pprof.Lookup("mutex").WriteTo(writer, 0); err != nil {
+ c.logger.Errorf("write Mutex profile error %v", err)
+ }
+ runtime.SetMutexProfileFraction(0)
+ case PprofEventsTypeHeap:
+ if err := pprof.Lookup("heap").WriteTo(writer, 0); err != nil {
+ c.logger.Errorf("write Heap profile error %v", err)
+ }
+ case PprofEventsTypeAllocs:
+ if err := pprof.Lookup("allocs").WriteTo(writer, 0); err != nil {
+ c.logger.Errorf("write Alloc profile error %v", err)
+ }
+ case PprofEventsTypeGoroutine:
+ if err := pprof.Lookup("goroutine").WriteTo(writer, 0); err != nil {
+ c.logger.Errorf("write Goroutine profile error %v", err)
+ }
+ case PprofEventsTypeThread:
+ if err := pprof.Lookup("threadcreate").WriteTo(writer, 0); err != nil {
+ c.logger.Errorf("write Thread profile error %v", err)
+ }
+ }
+
+ if c.pprofFilePath != "" {
+ c.closeFileWriter(writer)
+ }
+ c.readPprofData(c.taskID, writer)
+}
+
+func (c *PprofTaskCommandImpl) readPprofData(taskID string, writer io.Writer) {
+ var data []byte
+ if c.pprofFilePath == "" {
+ if buf, ok := writer.(*bytes.Buffer); ok {
+ data = buf.Bytes()
+ }
+ } else {
+ if file, ok := writer.(*os.File); ok {
+ filePath := file.Name()
+ fileData, err := os.ReadFile(filePath)
+ if err != nil {
+ c.logger.Errorf("failed to read pprof file %s: %v", filePath, err)
+ }
+ data = fileData
+ if err := os.Remove(filePath); err != nil {
+ c.logger.Errorf("failed to remove pprof file %s: %v", filePath, err)
+ }
+ }
+ }
+ c.manager.ReportPprof(taskID, data)
+}
diff --git a/plugins/core/reporter/grpc/grpc.go b/plugins/core/reporter/grpc/grpc.go
index 6734ea7..9d2ba5d 100644
--- a/plugins/core/reporter/grpc/grpc.go
+++ b/plugins/core/reporter/grpc/grpc.go
@@ -42,17 +42,19 @@
checkInterval time.Duration,
connManager *reporter.ConnectionManager,
cdsManager *reporter.CDSManager,
+ pprofTaskManager *reporter.PprofTaskManager,
opts ...ReporterOption,
) (reporter.Reporter, error) {
r := &gRPCReporter{
- logger: logger,
- serverAddr: serverAddr,
- tracingSendCh: make(chan *agentv3.SegmentObject, maxSendQueueSize),
- metricsSendCh: make(chan []*agentv3.MeterData, maxSendQueueSize),
- logSendCh: make(chan *logv3.LogData, maxSendQueueSize),
- checkInterval: checkInterval,
- connManager: connManager,
- cdsManager: cdsManager,
+ logger: logger,
+ serverAddr: serverAddr,
+ tracingSendCh: make(chan *agentv3.SegmentObject, maxSendQueueSize),
+ metricsSendCh: make(chan []*agentv3.MeterData, maxSendQueueSize),
+ logSendCh: make(chan *logv3.LogData, maxSendQueueSize),
+ checkInterval: checkInterval,
+ connManager: connManager,
+ cdsManager: cdsManager,
+ pprofTaskManager: pprofTaskManager,
}
for _, o := range opts {
o(r)
@@ -83,10 +85,11 @@
checkInterval time.Duration
// bootFlag is set if Boot be executed
- bootFlag bool
- transform *reporter.Transform
- connManager *reporter.ConnectionManager
- cdsManager *reporter.CDSManager
+ bootFlag bool
+ transform *reporter.Transform
+ connManager *reporter.ConnectionManager
+ cdsManager *reporter.CDSManager
+ pprofTaskManager *reporter.PprofTaskManager
}
func (r *gRPCReporter) Boot(entity *reporter.Entity, cdsWatchers []reporter.AgentConfigChangeWatcher) {
@@ -95,6 +98,7 @@
r.initSendPipeline()
r.check()
r.cdsManager.InitCDS(entity, cdsWatchers)
+ r.pprofTaskManager.InitPprofTask(entity)
r.bootFlag = true
}
diff --git a/plugins/core/reporter/pprof_manager.go b/plugins/core/reporter/pprof_manager.go
new file mode 100644
index 0000000..e3d5d9d
--- /dev/null
+++ b/plugins/core/reporter/pprof_manager.go
@@ -0,0 +1,349 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package reporter
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "strconv"
+ "time"
+
+ "github.com/apache/skywalking-go/plugins/core/operator"
+ commonv3 "github.com/apache/skywalking-go/protocols/collect/common/v3"
+ pprofv10 "github.com/apache/skywalking-go/protocols/collect/pprof/v10"
+)
+
+const (
+ // max chunk size for pprof data
+ maxChunkSize = 1 * 1024 * 1024
+ // max send queue size for pprof data
+ maxPprofSendQueueSize = 30000
+ // max duration for pprof task
+ pprofTaskDurationMaxMinute = 15 * time.Minute
+)
+
+type PprofTaskCommand interface {
+ GetTaskID() string
+ GetCreateTime() int64
+ GetDuration() time.Duration
+ GetDumpPeriod() int
+ StartTask() (io.Writer, error)
+ StopTask(io.Writer)
+ IsDirectSamplingType() bool
+ IsInvalidEvent() bool
+ HasDumpPeriod() bool
+}
+type PprofReporter interface {
+ ReportPprof(taskID string, content []byte)
+}
+
+var NewPprofTaskCommand func(taskID, events string, duration time.Duration,
+ createTime int64, dumpPeriod int, pprofFilePath string,
+ logger operator.LogOperator, manager PprofReporter) PprofTaskCommand
+
+type PprofTaskManager struct {
+ logger operator.LogOperator
+ serverAddr string
+ pprofInterval time.Duration
+ PprofClient pprofv10.PprofTaskClient // for grpc
+ connManager *ConnectionManager
+ entity *Entity
+ pprofFilePath string
+ LastUpdateTime int64
+ commands PprofTaskCommand
+ pprofSendCh chan *pprofv10.PprofData
+}
+
+func NewPprofTaskManager(logger operator.LogOperator, serverAddr string,
+ pprofInterval time.Duration, connManager *ConnectionManager,
+ pprofFilePath string) (*PprofTaskManager, error) {
+ if pprofInterval <= 0 {
+ logger.Errorf("pprof interval less than zero, pprof profiling is disabled")
+ return nil, fmt.Errorf("pprof interval less than zero, pprof profiling is disabled")
+ }
+ pprofManager := &PprofTaskManager{
+ logger: logger,
+ serverAddr: serverAddr,
+ pprofInterval: pprofInterval,
+ connManager: connManager,
+ pprofFilePath: pprofFilePath,
+ pprofSendCh: make(chan *pprofv10.PprofData, maxPprofSendQueueSize),
+ }
+ conn, err := connManager.GetConnection(serverAddr)
+ if err != nil {
+ return nil, err
+ }
+ pprofManager.PprofClient = pprofv10.NewPprofTaskClient(conn)
+ pprofManager.commands = nil
+ return pprofManager, nil
+}
+
+func (r *PprofTaskManager) InitPprofTask(entity *Entity) {
+ r.entity = entity
+ r.initPprofSendPipeline()
+ go func() {
+ for {
+ switch r.connManager.GetConnectionStatus(r.serverAddr) {
+ case ConnectionStatusShutdown:
+ return
+ case ConnectionStatusDisconnect:
+ time.Sleep(r.pprofInterval)
+ continue
+ }
+ pprofCommand, err := r.PprofClient.GetPprofTaskCommands(context.Background(), &pprofv10.PprofTaskCommandQuery{
+ Service: r.entity.ServiceName,
+ ServiceInstance: r.entity.ServiceInstanceName,
+ LastCommandTime: r.LastUpdateTime,
+ })
+ if err != nil {
+ r.logger.Errorf("fetch pprof task commands error %v", err)
+ time.Sleep(r.pprofInterval)
+ continue
+ }
+
+ if len(pprofCommand.GetCommands()) > 0 && pprofCommand.GetCommands()[0].Command == "PprofTaskQuery" {
+ rawCommand := pprofCommand.GetCommands()[0]
+ r.HandleCommand(rawCommand)
+ }
+
+ time.Sleep(r.pprofInterval)
+ }
+ }()
+}
+
+func (r *PprofTaskManager) HandleCommand(rawCommand *commonv3.Command) {
+ command := r.deserializePprofTaskCommand(rawCommand)
+ if command.GetCreateTime() > r.LastUpdateTime {
+ r.LastUpdateTime = command.GetCreateTime()
+ } else {
+ return
+ }
+ if err := r.checkCommand(command); err != nil {
+ r.logger.Errorf("check command error, cannot process this pprof task. reason: %v", err)
+ return
+ }
+
+ if command.IsDirectSamplingType() {
+ // direct sampling of Heap, Allocs, Goroutine, Thread
+ writer, err := command.StartTask()
+ if err != nil {
+ r.logger.Errorf("start %s pprof task error %v \n", command.GetTaskID(), err)
+ return
+ }
+ command.StopTask(writer)
+ } else {
+ // The CPU, Block and Mutex sampling lasts for a duration and then stops
+ writer, err := command.StartTask()
+ if err != nil {
+ r.logger.Errorf("start %s pprof task error %v \n", command.GetTaskID(), err)
+ return
+ }
+ time.AfterFunc(command.GetDuration(), func() {
+ command.StopTask(writer)
+ })
+ }
+}
+
+func (r *PprofTaskManager) deserializePprofTaskCommand(command *commonv3.Command) PprofTaskCommand {
+ args := command.Args
+ taskID := ""
+ events := ""
+ duration := 0
+ dumpPeriod := 0 // Use -1 to indicate no explicit value provided
+ var createTime int64 = 0
+ for _, pair := range args {
+ if pair.GetKey() == "TaskId" {
+ taskID = pair.GetValue()
+ } else if pair.GetKey() == "Events" {
+ events = pair.GetValue()
+ } else if pair.GetKey() == "Duration" {
+ if val, err := strconv.Atoi(pair.GetValue()); err == nil && val > 0 {
+ duration = val
+ }
+ } else if pair.GetKey() == "DumpPeriod" {
+ if val, err := strconv.Atoi(pair.GetValue()); err == nil && val >= 0 {
+ dumpPeriod = val
+ }
+ } else if pair.GetKey() == "CreateTime" {
+ createTime, _ = strconv.ParseInt(pair.GetValue(), 10, 64)
+ }
+ }
+
+ return NewPprofTaskCommand(
+ taskID,
+ events,
+ time.Duration(duration)*time.Minute,
+ createTime,
+ dumpPeriod,
+ r.pprofFilePath,
+ r.logger,
+ r,
+ )
+}
+
+func (r *PprofTaskManager) checkCommand(command PprofTaskCommand) error {
+ if command.GetTaskID() == "" {
+ return fmt.Errorf("pprof task id cannot be empty, task id is %s", command.GetTaskID())
+ }
+ if command.IsInvalidEvent() {
+ return fmt.Errorf("pprof task event is invalid, task id is %s", command.GetTaskID())
+ }
+ if !command.IsDirectSamplingType() {
+ if command.GetDuration() <= 0 || command.GetDuration() > pprofTaskDurationMaxMinute {
+ return fmt.Errorf("pprof task duration must be between 0 and %v, task id is %s", pprofTaskDurationMaxMinute, command.GetTaskID())
+ }
+ }
+ if command.HasDumpPeriod() && command.GetDumpPeriod() <= 0 {
+ return fmt.Errorf("pprof task dumpperiod must be greater than 0, task id is %s", command.GetTaskID())
+ }
+ return nil
+}
+
+func (r *PprofTaskManager) ReportPprof(taskID string, content []byte) {
+ metaData := &pprofv10.PprofMetaData{
+ Service: r.entity.ServiceName,
+ ServiceInstance: r.entity.ServiceInstanceName,
+ TaskId: taskID,
+ Type: pprofv10.PprofProfilingStatus_PPROF_PROFILING_SUCCESS,
+ ContentSize: int32(len(content)),
+ }
+
+ pprofData := &pprofv10.PprofData{
+ Metadata: metaData,
+ Result: &pprofv10.PprofData_Content{
+ Content: content,
+ },
+ }
+
+ select {
+ case r.pprofSendCh <- pprofData:
+ default:
+ r.logger.Errorf("reach max pprof send buffer")
+ }
+}
+
+func (r *PprofTaskManager) initPprofSendPipeline() {
+ go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ r.logger.Errorf("PprofTaskManager initPprofSendPipeline panic err %v", err)
+ }
+ }()
+ StreamLoop:
+ for {
+ switch r.connManager.GetConnectionStatus(r.serverAddr) {
+ case ConnectionStatusShutdown:
+ return
+ case ConnectionStatusDisconnect:
+ time.Sleep(5 * time.Second)
+ continue StreamLoop
+ }
+
+ for pprofData := range r.pprofSendCh {
+ r.uploadPprofData(pprofData)
+ }
+ break
+ }
+ }()
+}
+
+func (r *PprofTaskManager) uploadPprofData(pprofData *pprofv10.PprofData) {
+ ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
+ defer cancel()
+
+ stream, err := r.PprofClient.Collect(ctx)
+ if err != nil {
+ r.logger.Errorf("failed to start collect stream: %v", err)
+ return
+ }
+
+ // Send metadata first
+ metadataMsg := &pprofv10.PprofData{
+ Metadata: pprofData.Metadata,
+ }
+ if err = stream.Send(metadataMsg); err != nil {
+ r.logger.Errorf("failed to send metadata: %v", err)
+ return
+ }
+
+ resp, err := stream.Recv()
+ if err != nil {
+ r.logger.Errorf("failed to receive server response: %v", err)
+ return
+ }
+
+ switch resp.Status {
+ case pprofv10.PprofProfilingStatus_PPROF_TERMINATED_BY_OVERSIZE:
+ r.logger.Errorf("pprof is too large to be received by the oap server")
+ return
+ case pprofv10.PprofProfilingStatus_PPROF_EXECUTION_TASK_ERROR:
+ r.logger.Errorf("server rejected pprof upload due to execution task error")
+ return
+ }
+
+ // Upload content in chunks
+ content := pprofData.GetContent()
+ chunkCount := 0
+ contentSize := len(content)
+
+ for offset := 0; offset < contentSize; offset += maxChunkSize {
+ end := offset + maxChunkSize
+ if end > contentSize {
+ end = contentSize
+ }
+
+ chunkData := &pprofv10.PprofData{
+ Result: &pprofv10.PprofData_Content{
+ Content: content[offset:end],
+ },
+ }
+
+ if err := stream.Send(chunkData); err != nil {
+ r.logger.Errorf("failed to send pprof chunk %d: %v", chunkCount, err)
+ return
+ }
+ chunkCount++
+ // Check context timeout
+ select {
+ case <-ctx.Done():
+ r.logger.Errorf("context timeout during chunk upload for task %s", pprofData.Metadata.TaskId)
+ return
+ default:
+ }
+ }
+
+ r.closePprofStream(stream)
+}
+func (r *PprofTaskManager) closePprofStream(stream pprofv10.PprofTask_CollectClient) {
+ if err := stream.CloseSend(); err != nil {
+ r.logger.Errorf("failed to close send stream: %v", err)
+ return
+ }
+
+ for {
+ _, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ r.logger.Errorf("error receiving final response %v", err)
+ break
+ }
+ }
+}
diff --git a/test/plugins/scenarios/logrus/config/excepted.yml b/test/plugins/scenarios/logrus/config/excepted.yml
index 983c4aa..88bfca1 100644
--- a/test/plugins/scenarios/logrus/config/excepted.yml
+++ b/test/plugins/scenarios/logrus/config/excepted.yml
@@ -18,14 +18,23 @@
meterItems: []
logItems:
- serviceName: logrus
- logSize: ge 3
+ logSize: ge 4
logs:
- timestamp: nq 0
endpoint: ''
body:
type: TEXT
- content: { text: 'fetch dynamic configuration error rpc error: code = Unimplemented
- desc = Method not found: skywalking.v3.ConfigurationDiscoveryService/fetchConfigurations' }
+ content: { text: not null }
+ traceContext: { traceId: N/A, traceSegmentId: N/A, spanId: -1 }
+ tags:
+ data:
+ - { key: LEVEL, value: error }
+ layer: GENERAL
+ - timestamp: nq 0
+ endpoint: ''
+ body:
+ type: TEXT
+ content: { text: not null }
traceContext: { traceId: N/A, traceSegmentId: N/A, spanId: -1 }
tags:
data:
diff --git a/test/plugins/scenarios/zap/config/excepted.yml b/test/plugins/scenarios/zap/config/excepted.yml
index 0b927f9..dd1b2d8 100644
--- a/test/plugins/scenarios/zap/config/excepted.yml
+++ b/test/plugins/scenarios/zap/config/excepted.yml
@@ -18,14 +18,23 @@
meterItems: []
logItems:
- serviceName: zap
- logSize: ge 3
+ logSize: ge 4
logs:
- timestamp: nq 0
endpoint: ''
body:
type: TEXT
- content: { text: 'fetch dynamic configuration error rpc error: code = Unimplemented
- desc = Method not found: skywalking.v3.ConfigurationDiscoveryService/fetchConfigurations' }
+ content: { text: not null }
+ traceContext: { traceId: N/A, traceSegmentId: N/A, spanId: -1 }
+ tags:
+ data:
+ - { key: LEVEL, value: error }
+ layer: GENERAL
+ - timestamp: nq 0
+ endpoint: ''
+ body:
+ type: TEXT
+ content: { text: not null }
traceContext: { traceId: N/A, traceSegmentId: N/A, spanId: -1 }
tags:
data:
diff --git a/tools/go-agent/config/agent.default.yaml b/tools/go-agent/config/agent.default.yaml
index f07f22d..02d351f 100644
--- a/tools/go-agent/config/agent.default.yaml
+++ b/tools/go-agent/config/agent.default.yaml
@@ -54,6 +54,12 @@
authentication: ${SW_AGENT_REPORTER_GRPC_AUTHENTICATION:}
# The interval(s) of fetching dynamic configuration from backend.
cds_fetch_interval: ${SW_AGENT_REPORTER_GRPC_CDS_FETCH_INTERVAL:20}
+ pprof:
+ # The interval(s) of fetching pprof task from backend.
+ pprof_fetch_interval: ${SW_AGENT_REPORTER_GRPC_PPROF_TASK_FETCH_INTERVAL:20}
+ # The pprof file path generated when executing the profile task.
+ pprof_file_path: ${SW_AGENT_REPORTER_GRPC_PROFILE_PPROF_FILE_PATH:}
+
tls:
# Whether to enable TLS with backend.
enable: ${SW_AGENT_REPORTER_GRPC_TLS_ENABLE:false}
diff --git a/tools/go-agent/config/loader.go b/tools/go-agent/config/loader.go
index adc2a11..8d843df 100644
--- a/tools/go-agent/config/loader.go
+++ b/tools/go-agent/config/loader.go
@@ -83,11 +83,17 @@
}
type GRPCReporter struct {
- BackendService StringValue `yaml:"backend_service"`
- MaxSendQueue StringValue `yaml:"max_send_queue"`
- Authentication StringValue `yaml:"authentication"`
- CDSFetchInterval StringValue `yaml:"cds_fetch_interval"`
- TLS GRPCReporterTLS `yaml:"tls"`
+ BackendService StringValue `yaml:"backend_service"`
+ MaxSendQueue StringValue `yaml:"max_send_queue"`
+ Authentication StringValue `yaml:"authentication"`
+ CDSFetchInterval StringValue `yaml:"cds_fetch_interval"`
+ TLS GRPCReporterTLS `yaml:"tls"`
+ Pprof GRPCReporterPprof `yaml:"pprof"`
+}
+
+type GRPCReporterPprof struct {
+ PprofFetchInterval StringValue `yaml:"pprof_fetch_interval"`
+ PprofFilePath StringValue `yaml:"pprof_file_path"`
}
type GRPCReporterTLS struct {
diff --git a/tools/go-agent/instrument/reporter/instrument.go b/tools/go-agent/instrument/reporter/instrument.go
index 68409e6..43bb06a 100644
--- a/tools/go-agent/instrument/reporter/instrument.go
+++ b/tools/go-agent/instrument/reporter/instrument.go
@@ -160,7 +160,7 @@
reporterInitTemplate := baseReporterInitTemplate
if reporterType == consts.KafkaReporter {
reporterInitTemplate += `
- _, cdsManager, err := initManager(logger, checkInterval)
+ _, cdsManager, _, err := initManager(logger, checkInterval)
if err != nil {
return nil, err
}
@@ -169,11 +169,11 @@
reporterInitTemplate += kafkaReporterInitFunc
} else {
reporterInitTemplate += `
- connManager, cdsManager, err := initManager(logger, checkInterval)
+ connManager, cdsManager, pprofTaskManager, err := initManager(logger, checkInterval)
if err != nil {
return nil, err
}
- return initGRPCReporter(logger, checkInterval, connManager, cdsManager)
+ return initGRPCReporter(logger, checkInterval, connManager, cdsManager, pprofTaskManager)
}`
reporterInitTemplate += grpcReporterInitFunc
}
@@ -208,7 +208,7 @@
const initManagerFunc = `
-func initManager(logger operator.LogOperator, checkInterval time.Duration) (*ConnectionManager, *CDSManager, error) {
+func initManager(logger operator.LogOperator, checkInterval time.Duration) (*ConnectionManager, *CDSManager, *PprofTaskManager, error) {
authenticationVal := {{.Config.Reporter.GRPC.Authentication.ToGoStringValue}}
backendServiceVal := {{.Config.Reporter.GRPC.BackendService.ToGoStringValue}}
@@ -229,16 +229,25 @@
connManager, err = NewConnectionManager(logger, checkInterval, backendServiceVal, authenticationVal, nil)
}
if err != nil {
- return nil, nil, err
+ return nil, nil, nil, err
}
cdsFetchIntervalVal := {{.Config.Reporter.GRPC.CDSFetchInterval.ToGoIntValue "the cds fetch interval must be number"}}
cdsFetchInterval := time.Second * time.Duration(cdsFetchIntervalVal)
cdsManager, err := NewCDSManager(logger, backendServiceVal, cdsFetchInterval, connManager)
if err != nil {
- return nil, nil, err
+ return nil, nil, nil, err
}
- return connManager, cdsManager, nil
+
+ pprofFetchIntervalVal := {{.Config.Reporter.GRPC.Pprof.PprofFetchInterval.ToGoIntValue "the pprof fetch interval must be number"}}
+ pprofFetchInterval := time.Second * time.Duration(pprofFetchIntervalVal)
+ pprofFilePath := {{.Config.Reporter.GRPC.Pprof.PprofFilePath.ToGoStringValue}}
+ pprofTaskManager, err := NewPprofTaskManager(logger, backendServiceVal, pprofFetchInterval, connManager, pprofFilePath)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+
+ return connManager, cdsManager, pprofTaskManager, nil
}
`
@@ -247,13 +256,14 @@
func initGRPCReporter(logger operator.LogOperator,
checkInterval time.Duration,
connManager *ConnectionManager,
- cdsManager *CDSManager) (Reporter, error) {
+ cdsManager *CDSManager,
+ pprofTaskManager *PprofTaskManager) (Reporter, error) {
var opts []ReporterOption
maxSendQueueVal := {{.Config.Reporter.GRPC.MaxSendQueue.ToGoIntValue "the GRPC reporter max queue size must be number"}}
opts = append(opts, WithMaxSendQueueSize(maxSendQueueVal))
backendServiceVal := {{.Config.Reporter.GRPC.BackendService.ToGoStringValue}}
- return NewGRPCReporter(logger, backendServiceVal, checkInterval, connManager, cdsManager, opts...)
+ return NewGRPCReporter(logger, backendServiceVal, checkInterval, connManager, cdsManager, pprofTaskManager, opts...)
}
`