Support pprof profiling (#232)

diff --git a/CHANGES.md b/CHANGES.md
index 10524a6..0825750 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,6 +11,7 @@
 * Add recover to goroutine to prevent unexpected panics.
 * Add mutex to fix some data race. 
 * Replace external `goapi` dependency with in-repo generated protocols. 
+* Support pprof profiling. 
 #### Plugins
 
 #### Documentation
diff --git a/agent/core/compile.go b/agent/core/compile.go
index c511ac0..baefdb1 100644
--- a/agent/core/compile.go
+++ b/agent/core/compile.go
@@ -19,17 +19,21 @@
 
 import (
 	//go:nolint
+	_ "bytes"
 	_ "encoding/base64"
 	_ "fmt"
+	_ "io"
 	_ "log"
 	_ "math"
 	_ "math/rand"
 	_ "net"
 	_ "os"
+	_ "path/filepath"
 	_ "reflect"
 	_ "runtime"
 	_ "runtime/debug"
 	_ "runtime/metrics"
+	_ "runtime/pprof"
 	_ "sort"
 	_ "strconv"
 	_ "strings"
@@ -55,4 +59,5 @@
 	_ "github.com/apache/skywalking-go/protocols/collect/language/agent/v3"
 	_ "github.com/apache/skywalking-go/protocols/collect/language/profile/v3"
 	_ "github.com/apache/skywalking-go/protocols/collect/logging/v3"
+	_ "github.com/apache/skywalking-go/protocols/collect/pprof/v10"
 )
diff --git a/agent/reporter/imports.go b/agent/reporter/imports.go
index e1c6fd6..90c7e8a 100644
--- a/agent/reporter/imports.go
+++ b/agent/reporter/imports.go
@@ -71,5 +71,6 @@
 	_ "github.com/apache/skywalking-go/protocols/collect/language/profile/v3"
 	_ "github.com/apache/skywalking-go/protocols/collect/logging/v3"
 	_ "github.com/apache/skywalking-go/protocols/collect/management/v3"
+	_ "github.com/apache/skywalking-go/protocols/collect/pprof/v10"
 	_ "github.com/apache/skywalking-go/protocols/collect/servicemesh/v3"
 )
diff --git a/plugins/core/pprof.go b/plugins/core/pprof.go
new file mode 100644
index 0000000..8c83689
--- /dev/null
+++ b/plugins/core/pprof.go
@@ -0,0 +1,246 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"os"
+	"path/filepath"
+	"runtime"
+	"runtime/pprof"
+	"sync/atomic"
+	"time"
+
+	"github.com/apache/skywalking-go/plugins/core/operator"
+	"github.com/apache/skywalking-go/plugins/core/reporter"
+)
+
+const (
+	// Pprof event types
+	PprofEventsTypeCPU       = "cpu"
+	PprofEventsTypeHeap      = "heap"
+	PprofEventsTypeAllocs    = "allocs"
+	PprofEventsTypeBlock     = "block"
+	PprofEventsTypeMutex     = "mutex"
+	PprofEventsTypeThread    = "threadcreate"
+	PprofEventsTypeGoroutine = "goroutine"
+)
+
+// CPU profiling state to ensure only one CPU profiling task runs at a time
+var profilingIsRunning atomic.Bool
+
+func init() {
+	reporter.NewPprofTaskCommand = NewPprofTaskCommand
+}
+
+type PprofTaskCommandImpl struct {
+	// Pprof Task ID
+	taskID string
+	// Type of profiling (CPU/Heap/Block/Mutex/Goroutine/Threadcreate/Allocs)
+	events string
+	// Unit is minute, required for CPU, Block and Mutex events
+	duration time.Duration
+	// Unix timestamp in milliseconds when the task was created
+	createTime int64
+	// Define the period of the pprof dump, required for Block and Mutex events
+	dumpPeriod int
+
+	// for pprof task service
+	pprofFilePath string
+	logger        operator.LogOperator
+	manager       reporter.PprofReporter
+}
+
+func NewPprofTaskCommand(taskID, events string, duration time.Duration,
+	createTime int64, dumpPeriod int, pprofFilePath string,
+	logger operator.LogOperator, manager reporter.PprofReporter) reporter.PprofTaskCommand {
+	return &PprofTaskCommandImpl{
+		taskID:        taskID,
+		events:        events,
+		duration:      duration,
+		createTime:    createTime,
+		dumpPeriod:    dumpPeriod,
+		pprofFilePath: pprofFilePath,
+		logger:        logger,
+		manager:       manager,
+	}
+}
+
+func (c *PprofTaskCommandImpl) GetTaskID() string {
+	return c.taskID
+}
+
+func (c *PprofTaskCommandImpl) GetCreateTime() int64 {
+	return c.createTime
+}
+
+func (c *PprofTaskCommandImpl) GetDuration() time.Duration {
+	return c.duration
+}
+
+func (c *PprofTaskCommandImpl) GetDumpPeriod() int {
+	return c.dumpPeriod
+}
+
+func (c *PprofTaskCommandImpl) IsInvalidEvent() bool {
+	return !(c.events == PprofEventsTypeHeap ||
+		c.events == PprofEventsTypeAllocs ||
+		c.events == PprofEventsTypeGoroutine ||
+		c.events == PprofEventsTypeThread ||
+		c.events == PprofEventsTypeCPU ||
+		c.events == PprofEventsTypeBlock ||
+		c.events == PprofEventsTypeMutex)
+}
+
+func (c *PprofTaskCommandImpl) IsDirectSamplingType() bool {
+	return c.events == PprofEventsTypeHeap ||
+		c.events == PprofEventsTypeAllocs ||
+		c.events == PprofEventsTypeGoroutine ||
+		c.events == PprofEventsTypeThread
+}
+
+func (c *PprofTaskCommandImpl) HasDumpPeriod() bool {
+	return c.events == PprofEventsTypeBlock ||
+		c.events == PprofEventsTypeMutex
+}
+
+func (c *PprofTaskCommandImpl) closeFileWriter(writer io.Writer) {
+	if file, ok := writer.(*os.File); ok {
+		if err := file.Close(); err != nil {
+			c.logger.Errorf("failed to close pprof file: %v", err)
+		}
+	}
+}
+
+func (c *PprofTaskCommandImpl) getWriter() (io.Writer, error) {
+	// sample data to buffer
+	if c.pprofFilePath == "" {
+		return &bytes.Buffer{}, nil
+	}
+
+	// sample data to file
+	pprofFileName := filepath.Join(c.taskID, ".pprof")
+	pprofFilePath := filepath.Join(c.pprofFilePath, pprofFileName)
+	if err := os.MkdirAll(filepath.Dir(pprofFilePath), os.ModePerm); err != nil {
+		return nil, err
+	}
+
+	writer, err := os.Create(pprofFilePath)
+	if err != nil {
+		return nil, err
+	}
+
+	return writer, nil
+}
+
+func (c *PprofTaskCommandImpl) StartTask() (io.Writer, error) {
+	c.logger.Infof("start pprof task %s", c.taskID)
+	// For CPU profiling, check global state first
+	if c.events == PprofEventsTypeCPU && !profilingIsRunning.CompareAndSwap(false, true) {
+		return nil, fmt.Errorf("CPU profiling is already running")
+	}
+
+	writer, err := c.getWriter()
+	if err != nil {
+		if c.events == PprofEventsTypeCPU {
+			profilingIsRunning.Store(false)
+		}
+		return nil, err
+	}
+
+	switch c.events {
+	case PprofEventsTypeCPU:
+		if err = pprof.StartCPUProfile(writer); err != nil {
+			profilingIsRunning.Store(false)
+			if c.pprofFilePath != "" {
+				c.closeFileWriter(writer)
+			}
+			return nil, err
+		}
+	case PprofEventsTypeBlock:
+		runtime.SetBlockProfileRate(c.dumpPeriod)
+	case PprofEventsTypeMutex:
+		runtime.SetMutexProfileFraction(c.dumpPeriod)
+	}
+
+	return writer, nil
+}
+
+func (c *PprofTaskCommandImpl) StopTask(writer io.Writer) {
+	c.logger.Infof("stop pprof task %s", c.taskID)
+	switch c.events {
+	case PprofEventsTypeCPU:
+		pprof.StopCPUProfile()
+		profilingIsRunning.Store(false)
+	case PprofEventsTypeBlock:
+		if err := pprof.Lookup("block").WriteTo(writer, 0); err != nil {
+			c.logger.Errorf("write Block profile error %v", err)
+		}
+		runtime.SetBlockProfileRate(0)
+	case PprofEventsTypeMutex:
+		if err := pprof.Lookup("mutex").WriteTo(writer, 0); err != nil {
+			c.logger.Errorf("write Mutex profile error %v", err)
+		}
+		runtime.SetMutexProfileFraction(0)
+	case PprofEventsTypeHeap:
+		if err := pprof.Lookup("heap").WriteTo(writer, 0); err != nil {
+			c.logger.Errorf("write Heap profile error %v", err)
+		}
+	case PprofEventsTypeAllocs:
+		if err := pprof.Lookup("allocs").WriteTo(writer, 0); err != nil {
+			c.logger.Errorf("write Alloc profile error %v", err)
+		}
+	case PprofEventsTypeGoroutine:
+		if err := pprof.Lookup("goroutine").WriteTo(writer, 0); err != nil {
+			c.logger.Errorf("write Goroutine profile error %v", err)
+		}
+	case PprofEventsTypeThread:
+		if err := pprof.Lookup("threadcreate").WriteTo(writer, 0); err != nil {
+			c.logger.Errorf("write Thread profile error %v", err)
+		}
+	}
+
+	if c.pprofFilePath != "" {
+		c.closeFileWriter(writer)
+	}
+	c.readPprofData(c.taskID, writer)
+}
+
+func (c *PprofTaskCommandImpl) readPprofData(taskID string, writer io.Writer) {
+	var data []byte
+	if c.pprofFilePath == "" {
+		if buf, ok := writer.(*bytes.Buffer); ok {
+			data = buf.Bytes()
+		}
+	} else {
+		if file, ok := writer.(*os.File); ok {
+			filePath := file.Name()
+			fileData, err := os.ReadFile(filePath)
+			if err != nil {
+				c.logger.Errorf("failed to read pprof file %s: %v", filePath, err)
+			}
+			data = fileData
+			if err := os.Remove(filePath); err != nil {
+				c.logger.Errorf("failed to remove pprof file %s: %v", filePath, err)
+			}
+		}
+	}
+	c.manager.ReportPprof(taskID, data)
+}
diff --git a/plugins/core/reporter/grpc/grpc.go b/plugins/core/reporter/grpc/grpc.go
index 6734ea7..9d2ba5d 100644
--- a/plugins/core/reporter/grpc/grpc.go
+++ b/plugins/core/reporter/grpc/grpc.go
@@ -42,17 +42,19 @@
 	checkInterval time.Duration,
 	connManager *reporter.ConnectionManager,
 	cdsManager *reporter.CDSManager,
+	pprofTaskManager *reporter.PprofTaskManager,
 	opts ...ReporterOption,
 ) (reporter.Reporter, error) {
 	r := &gRPCReporter{
-		logger:        logger,
-		serverAddr:    serverAddr,
-		tracingSendCh: make(chan *agentv3.SegmentObject, maxSendQueueSize),
-		metricsSendCh: make(chan []*agentv3.MeterData, maxSendQueueSize),
-		logSendCh:     make(chan *logv3.LogData, maxSendQueueSize),
-		checkInterval: checkInterval,
-		connManager:   connManager,
-		cdsManager:    cdsManager,
+		logger:           logger,
+		serverAddr:       serverAddr,
+		tracingSendCh:    make(chan *agentv3.SegmentObject, maxSendQueueSize),
+		metricsSendCh:    make(chan []*agentv3.MeterData, maxSendQueueSize),
+		logSendCh:        make(chan *logv3.LogData, maxSendQueueSize),
+		checkInterval:    checkInterval,
+		connManager:      connManager,
+		cdsManager:       cdsManager,
+		pprofTaskManager: pprofTaskManager,
 	}
 	for _, o := range opts {
 		o(r)
@@ -83,10 +85,11 @@
 	checkInterval    time.Duration
 
 	// bootFlag is set if Boot be executed
-	bootFlag    bool
-	transform   *reporter.Transform
-	connManager *reporter.ConnectionManager
-	cdsManager  *reporter.CDSManager
+	bootFlag         bool
+	transform        *reporter.Transform
+	connManager      *reporter.ConnectionManager
+	cdsManager       *reporter.CDSManager
+	pprofTaskManager *reporter.PprofTaskManager
 }
 
 func (r *gRPCReporter) Boot(entity *reporter.Entity, cdsWatchers []reporter.AgentConfigChangeWatcher) {
@@ -95,6 +98,7 @@
 	r.initSendPipeline()
 	r.check()
 	r.cdsManager.InitCDS(entity, cdsWatchers)
+	r.pprofTaskManager.InitPprofTask(entity)
 	r.bootFlag = true
 }
 
diff --git a/plugins/core/reporter/pprof_manager.go b/plugins/core/reporter/pprof_manager.go
new file mode 100644
index 0000000..e3d5d9d
--- /dev/null
+++ b/plugins/core/reporter/pprof_manager.go
@@ -0,0 +1,349 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package reporter
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"strconv"
+	"time"
+
+	"github.com/apache/skywalking-go/plugins/core/operator"
+	commonv3 "github.com/apache/skywalking-go/protocols/collect/common/v3"
+	pprofv10 "github.com/apache/skywalking-go/protocols/collect/pprof/v10"
+)
+
+const (
+	// max chunk size for pprof data
+	maxChunkSize = 1 * 1024 * 1024
+	// max send queue size for pprof data
+	maxPprofSendQueueSize = 30000
+	// max duration for pprof task
+	pprofTaskDurationMaxMinute = 15 * time.Minute
+)
+
+type PprofTaskCommand interface {
+	GetTaskID() string
+	GetCreateTime() int64
+	GetDuration() time.Duration
+	GetDumpPeriod() int
+	StartTask() (io.Writer, error)
+	StopTask(io.Writer)
+	IsDirectSamplingType() bool
+	IsInvalidEvent() bool
+	HasDumpPeriod() bool
+}
+type PprofReporter interface {
+	ReportPprof(taskID string, content []byte)
+}
+
+var NewPprofTaskCommand func(taskID, events string, duration time.Duration,
+	createTime int64, dumpPeriod int, pprofFilePath string,
+	logger operator.LogOperator, manager PprofReporter) PprofTaskCommand
+
+type PprofTaskManager struct {
+	logger         operator.LogOperator
+	serverAddr     string
+	pprofInterval  time.Duration
+	PprofClient    pprofv10.PprofTaskClient // for grpc
+	connManager    *ConnectionManager
+	entity         *Entity
+	pprofFilePath  string
+	LastUpdateTime int64
+	commands       PprofTaskCommand
+	pprofSendCh    chan *pprofv10.PprofData
+}
+
+func NewPprofTaskManager(logger operator.LogOperator, serverAddr string,
+	pprofInterval time.Duration, connManager *ConnectionManager,
+	pprofFilePath string) (*PprofTaskManager, error) {
+	if pprofInterval <= 0 {
+		logger.Errorf("pprof interval less than zero, pprof profiling is disabled")
+		return nil, fmt.Errorf("pprof interval less than zero, pprof profiling is disabled")
+	}
+	pprofManager := &PprofTaskManager{
+		logger:        logger,
+		serverAddr:    serverAddr,
+		pprofInterval: pprofInterval,
+		connManager:   connManager,
+		pprofFilePath: pprofFilePath,
+		pprofSendCh:   make(chan *pprofv10.PprofData, maxPprofSendQueueSize),
+	}
+	conn, err := connManager.GetConnection(serverAddr)
+	if err != nil {
+		return nil, err
+	}
+	pprofManager.PprofClient = pprofv10.NewPprofTaskClient(conn)
+	pprofManager.commands = nil
+	return pprofManager, nil
+}
+
+func (r *PprofTaskManager) InitPprofTask(entity *Entity) {
+	r.entity = entity
+	r.initPprofSendPipeline()
+	go func() {
+		for {
+			switch r.connManager.GetConnectionStatus(r.serverAddr) {
+			case ConnectionStatusShutdown:
+				return
+			case ConnectionStatusDisconnect:
+				time.Sleep(r.pprofInterval)
+				continue
+			}
+			pprofCommand, err := r.PprofClient.GetPprofTaskCommands(context.Background(), &pprofv10.PprofTaskCommandQuery{
+				Service:         r.entity.ServiceName,
+				ServiceInstance: r.entity.ServiceInstanceName,
+				LastCommandTime: r.LastUpdateTime,
+			})
+			if err != nil {
+				r.logger.Errorf("fetch pprof task commands error %v", err)
+				time.Sleep(r.pprofInterval)
+				continue
+			}
+
+			if len(pprofCommand.GetCommands()) > 0 && pprofCommand.GetCommands()[0].Command == "PprofTaskQuery" {
+				rawCommand := pprofCommand.GetCommands()[0]
+				r.HandleCommand(rawCommand)
+			}
+
+			time.Sleep(r.pprofInterval)
+		}
+	}()
+}
+
+func (r *PprofTaskManager) HandleCommand(rawCommand *commonv3.Command) {
+	command := r.deserializePprofTaskCommand(rawCommand)
+	if command.GetCreateTime() > r.LastUpdateTime {
+		r.LastUpdateTime = command.GetCreateTime()
+	} else {
+		return
+	}
+	if err := r.checkCommand(command); err != nil {
+		r.logger.Errorf("check command error, cannot process this pprof task. reason: %v", err)
+		return
+	}
+
+	if command.IsDirectSamplingType() {
+		// direct sampling of Heap, Allocs, Goroutine, Thread
+		writer, err := command.StartTask()
+		if err != nil {
+			r.logger.Errorf("start %s pprof task error %v \n", command.GetTaskID(), err)
+			return
+		}
+		command.StopTask(writer)
+	} else {
+		// The CPU, Block and Mutex sampling lasts for a duration and then stops
+		writer, err := command.StartTask()
+		if err != nil {
+			r.logger.Errorf("start %s pprof task error %v \n", command.GetTaskID(), err)
+			return
+		}
+		time.AfterFunc(command.GetDuration(), func() {
+			command.StopTask(writer)
+		})
+	}
+}
+
+func (r *PprofTaskManager) deserializePprofTaskCommand(command *commonv3.Command) PprofTaskCommand {
+	args := command.Args
+	taskID := ""
+	events := ""
+	duration := 0
+	dumpPeriod := 0 // Use -1 to indicate no explicit value provided
+	var createTime int64 = 0
+	for _, pair := range args {
+		if pair.GetKey() == "TaskId" {
+			taskID = pair.GetValue()
+		} else if pair.GetKey() == "Events" {
+			events = pair.GetValue()
+		} else if pair.GetKey() == "Duration" {
+			if val, err := strconv.Atoi(pair.GetValue()); err == nil && val > 0 {
+				duration = val
+			}
+		} else if pair.GetKey() == "DumpPeriod" {
+			if val, err := strconv.Atoi(pair.GetValue()); err == nil && val >= 0 {
+				dumpPeriod = val
+			}
+		} else if pair.GetKey() == "CreateTime" {
+			createTime, _ = strconv.ParseInt(pair.GetValue(), 10, 64)
+		}
+	}
+
+	return NewPprofTaskCommand(
+		taskID,
+		events,
+		time.Duration(duration)*time.Minute,
+		createTime,
+		dumpPeriod,
+		r.pprofFilePath,
+		r.logger,
+		r,
+	)
+}
+
+func (r *PprofTaskManager) checkCommand(command PprofTaskCommand) error {
+	if command.GetTaskID() == "" {
+		return fmt.Errorf("pprof task id cannot be empty, task id is %s", command.GetTaskID())
+	}
+	if command.IsInvalidEvent() {
+		return fmt.Errorf("pprof task event is invalid, task id is %s", command.GetTaskID())
+	}
+	if !command.IsDirectSamplingType() {
+		if command.GetDuration() <= 0 || command.GetDuration() > pprofTaskDurationMaxMinute {
+			return fmt.Errorf("pprof task duration must be between 0 and %v, task id is %s", pprofTaskDurationMaxMinute, command.GetTaskID())
+		}
+	}
+	if command.HasDumpPeriod() && command.GetDumpPeriod() <= 0 {
+		return fmt.Errorf("pprof task dumpperiod must be greater than 0, task id is %s", command.GetTaskID())
+	}
+	return nil
+}
+
+func (r *PprofTaskManager) ReportPprof(taskID string, content []byte) {
+	metaData := &pprofv10.PprofMetaData{
+		Service:         r.entity.ServiceName,
+		ServiceInstance: r.entity.ServiceInstanceName,
+		TaskId:          taskID,
+		Type:            pprofv10.PprofProfilingStatus_PPROF_PROFILING_SUCCESS,
+		ContentSize:     int32(len(content)),
+	}
+
+	pprofData := &pprofv10.PprofData{
+		Metadata: metaData,
+		Result: &pprofv10.PprofData_Content{
+			Content: content,
+		},
+	}
+
+	select {
+	case r.pprofSendCh <- pprofData:
+	default:
+		r.logger.Errorf("reach max pprof send buffer")
+	}
+}
+
+func (r *PprofTaskManager) initPprofSendPipeline() {
+	go func() {
+		defer func() {
+			if err := recover(); err != nil {
+				r.logger.Errorf("PprofTaskManager initPprofSendPipeline panic err %v", err)
+			}
+		}()
+	StreamLoop:
+		for {
+			switch r.connManager.GetConnectionStatus(r.serverAddr) {
+			case ConnectionStatusShutdown:
+				return
+			case ConnectionStatusDisconnect:
+				time.Sleep(5 * time.Second)
+				continue StreamLoop
+			}
+
+			for pprofData := range r.pprofSendCh {
+				r.uploadPprofData(pprofData)
+			}
+			break
+		}
+	}()
+}
+
+func (r *PprofTaskManager) uploadPprofData(pprofData *pprofv10.PprofData) {
+	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
+	defer cancel()
+
+	stream, err := r.PprofClient.Collect(ctx)
+	if err != nil {
+		r.logger.Errorf("failed to start collect stream: %v", err)
+		return
+	}
+
+	// Send metadata first
+	metadataMsg := &pprofv10.PprofData{
+		Metadata: pprofData.Metadata,
+	}
+	if err = stream.Send(metadataMsg); err != nil {
+		r.logger.Errorf("failed to send metadata: %v", err)
+		return
+	}
+
+	resp, err := stream.Recv()
+	if err != nil {
+		r.logger.Errorf("failed to receive server response: %v", err)
+		return
+	}
+
+	switch resp.Status {
+	case pprofv10.PprofProfilingStatus_PPROF_TERMINATED_BY_OVERSIZE:
+		r.logger.Errorf("pprof is too large to be received by the oap server")
+		return
+	case pprofv10.PprofProfilingStatus_PPROF_EXECUTION_TASK_ERROR:
+		r.logger.Errorf("server rejected pprof upload due to execution task error")
+		return
+	}
+
+	// Upload content in chunks
+	content := pprofData.GetContent()
+	chunkCount := 0
+	contentSize := len(content)
+
+	for offset := 0; offset < contentSize; offset += maxChunkSize {
+		end := offset + maxChunkSize
+		if end > contentSize {
+			end = contentSize
+		}
+
+		chunkData := &pprofv10.PprofData{
+			Result: &pprofv10.PprofData_Content{
+				Content: content[offset:end],
+			},
+		}
+
+		if err := stream.Send(chunkData); err != nil {
+			r.logger.Errorf("failed to send pprof chunk %d: %v", chunkCount, err)
+			return
+		}
+		chunkCount++
+		// Check context timeout
+		select {
+		case <-ctx.Done():
+			r.logger.Errorf("context timeout during chunk upload for task %s", pprofData.Metadata.TaskId)
+			return
+		default:
+		}
+	}
+
+	r.closePprofStream(stream)
+}
+func (r *PprofTaskManager) closePprofStream(stream pprofv10.PprofTask_CollectClient) {
+	if err := stream.CloseSend(); err != nil {
+		r.logger.Errorf("failed to close send stream: %v", err)
+		return
+	}
+
+	for {
+		_, err := stream.Recv()
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			r.logger.Errorf("error receiving final response %v", err)
+			break
+		}
+	}
+}
diff --git a/test/plugins/scenarios/logrus/config/excepted.yml b/test/plugins/scenarios/logrus/config/excepted.yml
index 983c4aa..88bfca1 100644
--- a/test/plugins/scenarios/logrus/config/excepted.yml
+++ b/test/plugins/scenarios/logrus/config/excepted.yml
@@ -18,14 +18,23 @@
 meterItems: []
 logItems:
   - serviceName: logrus
-    logSize: ge 3
+    logSize: ge 4
     logs:
       - timestamp: nq 0
         endpoint: ''
         body:
           type: TEXT
-          content: { text: 'fetch dynamic configuration error rpc error: code = Unimplemented
-              desc = Method not found: skywalking.v3.ConfigurationDiscoveryService/fetchConfigurations' }
+          content: { text: not null }
+        traceContext: { traceId: N/A, traceSegmentId: N/A, spanId: -1 }
+        tags:
+          data:
+            - { key: LEVEL, value: error }
+        layer: GENERAL
+      - timestamp: nq 0
+        endpoint: ''
+        body:
+          type: TEXT
+          content: { text: not null }
         traceContext: { traceId: N/A, traceSegmentId: N/A, spanId: -1 }
         tags:
           data:
diff --git a/test/plugins/scenarios/zap/config/excepted.yml b/test/plugins/scenarios/zap/config/excepted.yml
index 0b927f9..dd1b2d8 100644
--- a/test/plugins/scenarios/zap/config/excepted.yml
+++ b/test/plugins/scenarios/zap/config/excepted.yml
@@ -18,14 +18,23 @@
 meterItems: []
 logItems:
   - serviceName: zap
-    logSize: ge 3
+    logSize: ge 4
     logs:
       - timestamp: nq 0
         endpoint: ''
         body:
           type: TEXT
-          content: { text: 'fetch dynamic configuration error rpc error: code = Unimplemented
-              desc = Method not found: skywalking.v3.ConfigurationDiscoveryService/fetchConfigurations' }
+          content: { text: not null }
+        traceContext: { traceId: N/A, traceSegmentId: N/A, spanId: -1 }
+        tags:
+          data:
+            - { key: LEVEL, value: error }
+        layer: GENERAL
+      - timestamp: nq 0
+        endpoint: ''
+        body:
+          type: TEXT
+          content: { text: not null }
         traceContext: { traceId: N/A, traceSegmentId: N/A, spanId: -1 }
         tags:
           data:
diff --git a/tools/go-agent/config/agent.default.yaml b/tools/go-agent/config/agent.default.yaml
index f07f22d..02d351f 100644
--- a/tools/go-agent/config/agent.default.yaml
+++ b/tools/go-agent/config/agent.default.yaml
@@ -54,6 +54,12 @@
     authentication: ${SW_AGENT_REPORTER_GRPC_AUTHENTICATION:}
     # The interval(s) of fetching dynamic configuration from backend.
     cds_fetch_interval: ${SW_AGENT_REPORTER_GRPC_CDS_FETCH_INTERVAL:20}
+    pprof:
+      # The interval(s) of fetching pprof task from backend.
+      pprof_fetch_interval: ${SW_AGENT_REPORTER_GRPC_PPROF_TASK_FETCH_INTERVAL:20}
+      # The pprof file path generated when executing the profile task.
+      pprof_file_path: ${SW_AGENT_REPORTER_GRPC_PROFILE_PPROF_FILE_PATH:}
+
     tls:
       # Whether to enable TLS with backend.
       enable: ${SW_AGENT_REPORTER_GRPC_TLS_ENABLE:false}
diff --git a/tools/go-agent/config/loader.go b/tools/go-agent/config/loader.go
index adc2a11..8d843df 100644
--- a/tools/go-agent/config/loader.go
+++ b/tools/go-agent/config/loader.go
@@ -83,11 +83,17 @@
 }
 
 type GRPCReporter struct {
-	BackendService   StringValue     `yaml:"backend_service"`
-	MaxSendQueue     StringValue     `yaml:"max_send_queue"`
-	Authentication   StringValue     `yaml:"authentication"`
-	CDSFetchInterval StringValue     `yaml:"cds_fetch_interval"`
-	TLS              GRPCReporterTLS `yaml:"tls"`
+	BackendService   StringValue       `yaml:"backend_service"`
+	MaxSendQueue     StringValue       `yaml:"max_send_queue"`
+	Authentication   StringValue       `yaml:"authentication"`
+	CDSFetchInterval StringValue       `yaml:"cds_fetch_interval"`
+	TLS              GRPCReporterTLS   `yaml:"tls"`
+	Pprof            GRPCReporterPprof `yaml:"pprof"`
+}
+
+type GRPCReporterPprof struct {
+	PprofFetchInterval StringValue `yaml:"pprof_fetch_interval"`
+	PprofFilePath      StringValue `yaml:"pprof_file_path"`
 }
 
 type GRPCReporterTLS struct {
diff --git a/tools/go-agent/instrument/reporter/instrument.go b/tools/go-agent/instrument/reporter/instrument.go
index 68409e6..43bb06a 100644
--- a/tools/go-agent/instrument/reporter/instrument.go
+++ b/tools/go-agent/instrument/reporter/instrument.go
@@ -160,7 +160,7 @@
 	reporterInitTemplate := baseReporterInitTemplate
 	if reporterType == consts.KafkaReporter {
 		reporterInitTemplate += `
-	_, cdsManager, err := initManager(logger, checkInterval)
+	_, cdsManager, _, err := initManager(logger, checkInterval)
 	if err != nil {
 		return nil, err
 	}
@@ -169,11 +169,11 @@
 		reporterInitTemplate += kafkaReporterInitFunc
 	} else {
 		reporterInitTemplate += `
-	connManager, cdsManager, err := initManager(logger, checkInterval)
+	connManager, cdsManager, pprofTaskManager, err := initManager(logger, checkInterval)
 	if err != nil {
 		return nil, err
 	}
-	return initGRPCReporter(logger, checkInterval, connManager, cdsManager)
+	return initGRPCReporter(logger, checkInterval, connManager, cdsManager, pprofTaskManager)
 }`
 		reporterInitTemplate += grpcReporterInitFunc
 	}
@@ -208,7 +208,7 @@
 
 const initManagerFunc = `
 
-func initManager(logger operator.LogOperator, checkInterval time.Duration) (*ConnectionManager, *CDSManager, error) {
+func initManager(logger operator.LogOperator, checkInterval time.Duration) (*ConnectionManager, *CDSManager, *PprofTaskManager, error) {
 	authenticationVal := {{.Config.Reporter.GRPC.Authentication.ToGoStringValue}}
 	backendServiceVal := {{.Config.Reporter.GRPC.BackendService.ToGoStringValue}}
 
@@ -229,16 +229,25 @@
 		connManager, err = NewConnectionManager(logger, checkInterval, backendServiceVal, authenticationVal, nil)
 	}
 	if err != nil {
-		return nil, nil, err
+		return nil, nil, nil, err
 	}
 
 	cdsFetchIntervalVal := {{.Config.Reporter.GRPC.CDSFetchInterval.ToGoIntValue "the cds fetch interval must be number"}}
 	cdsFetchInterval := time.Second * time.Duration(cdsFetchIntervalVal)
 	cdsManager, err := NewCDSManager(logger, backendServiceVal, cdsFetchInterval, connManager)
 	if err != nil {
-		return nil, nil, err
+		return nil, nil, nil, err
 	}
-	return connManager, cdsManager, nil
+
+	pprofFetchIntervalVal := {{.Config.Reporter.GRPC.Pprof.PprofFetchInterval.ToGoIntValue "the pprof fetch interval must be number"}}
+	pprofFetchInterval := time.Second * time.Duration(pprofFetchIntervalVal)
+	pprofFilePath := {{.Config.Reporter.GRPC.Pprof.PprofFilePath.ToGoStringValue}}
+	pprofTaskManager, err := NewPprofTaskManager(logger, backendServiceVal, pprofFetchInterval, connManager, pprofFilePath)
+	if err != nil {
+		return nil, nil, nil, err
+	}
+
+	return connManager, cdsManager, pprofTaskManager, nil
 }
 `
 
@@ -247,13 +256,14 @@
 func initGRPCReporter(logger operator.LogOperator,
 					checkInterval time.Duration,
 					connManager *ConnectionManager,
-					cdsManager *CDSManager) (Reporter, error) {
+					cdsManager *CDSManager,
+					pprofTaskManager *PprofTaskManager) (Reporter, error) {
 	var opts []ReporterOption
 	maxSendQueueVal := {{.Config.Reporter.GRPC.MaxSendQueue.ToGoIntValue "the GRPC reporter max queue size must be number"}}
 	opts = append(opts, WithMaxSendQueueSize(maxSendQueueVal))
 
 	backendServiceVal := {{.Config.Reporter.GRPC.BackendService.ToGoStringValue}}
-	return NewGRPCReporter(logger, backendServiceVal, checkInterval, connManager, cdsManager, opts...)
+	return NewGRPCReporter(logger, backendServiceVal, checkInterval, connManager, cdsManager, pprofTaskManager, opts...)
 }
 `