blob: 64211e9fac25ed75ae2dd5e0681b1161e377cfc4 [file]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/apache/beam/sdks/v2/go/container/tools"
)
type profilerConfigKeyType struct{}
var profilerConfigKey profilerConfigKeyType
// ProfilerConfig holds all pre-computed profiling parameters.
type ProfilerConfig struct {
Enabled bool
Agent string
ExtraArgs []string
ExtraEnvVars []string
Location string
TempLocation string
BaseTempDir string
StopSentinelPath string
GcsDestPath string
UploadIntervalSec int
StopAfterSec int
StopAfterCrash bool
PostprocessIntervalSec int
}
// setupProfilerConfig parses PipelineOptionsData and stores a resolved ProfilerConfig in the context.
func setupProfilerConfig(ctx context.Context, logger *tools.Logger, opts *PipelineOptionsData) context.Context {
agent := opts.Options.ProfilerAgent
if agent == "" {
return ctx
}
baseTempDir := opts.Options.ProfileTempLocation
if baseTempDir == "" {
baseTempDir = filepath.Join(*semiPersistDir, "profiles")
}
jobId := opts.Options.JobId
if jobId == "" {
jobId = "BEAM_JOB"
}
hostname, _ := os.Hostname()
if hostname == "" {
hostname = "default-worker"
}
tempLocation := filepath.Join(baseTempDir, jobId, hostname)
sentinelPath := filepath.Join(tempLocation, fmt.Sprintf(".profiler_disengaged_%s_%s", jobId, hostname))
var gcsDestPath string
if strings.HasPrefix(opts.Options.ProfileLocation, "gs://") {
gcsDestPath = strings.TrimSuffix(opts.Options.ProfileLocation, "/")
}
config := &ProfilerConfig{
Enabled: true,
Agent: agent,
ExtraArgs: opts.Options.ProfilerExtraArgs,
ExtraEnvVars: opts.Options.ProfilerExtraEnvVars,
Location: opts.Options.ProfileLocation,
BaseTempDir: baseTempDir,
TempLocation: tempLocation,
StopSentinelPath: sentinelPath,
GcsDestPath: gcsDestPath,
UploadIntervalSec: opts.Options.ProfileUploadIntervalSec,
StopAfterSec: opts.Options.ProfilerStopAfterSec,
StopAfterCrash: opts.Options.ProfilerStopAfterCrash,
PostprocessIntervalSec: opts.Options.ProfilePostprocessIntervalSec,
}
return context.WithValue(ctx, profilerConfigKey, config)
}
// getProfilerConfig extracts the ProfilerConfig from the context.
func getProfilerConfig(ctx context.Context) *ProfilerConfig {
if cfg, ok := ctx.Value(profilerConfigKey).(*ProfilerConfig); ok {
return cfg
}
return nil
}
// startProfilerBackgroundTasks initializes profiling locations and runs background tasks (GCS sync, post-processing loops) if profiling is enabled.
func startProfilerBackgroundTasks(ctx context.Context, logger *tools.Logger) {
pcfg := getProfilerConfig(ctx)
if pcfg == nil {
return
}
logger.Printf(ctx, "Worker will be configured with profiler agent enabled.")
logger.Printf(ctx, "ProfilerAgent: %v", pcfg.Agent)
logger.Printf(ctx, "ProfilerExtraArgs: %v", pcfg.ExtraArgs)
logger.Printf(ctx, "ProfilerExtraEnvVars: %v", pcfg.ExtraEnvVars)
logger.Printf(ctx, "ProfileLocation: %v", pcfg.Location)
logger.Printf(ctx, "ProfileTempLocation: %v", pcfg.BaseTempDir)
logger.Printf(ctx, "ProfileUploadIntervalSec: %v", pcfg.UploadIntervalSec)
logger.Printf(ctx, "ProfilerStopAfterSec: %v", pcfg.StopAfterSec)
logger.Printf(ctx, "ProfilerStopAfterCrash: %v", pcfg.StopAfterCrash)
logger.Printf(ctx, "ProfilePostprocessIntervalSec: %v", pcfg.PostprocessIntervalSec)
if err := os.MkdirAll(pcfg.TempLocation, 0755); err != nil {
logger.Warnf(ctx, "Failed to create ProfileTempLocation: %v", err)
}
if pcfg.GcsDestPath != "" {
if _, err := exec.LookPath("gcloud"); err != nil {
logger.Errorf(ctx, "gcloud is not available, profiles will not be uploaded.")
} else {
if pcfg.UploadIntervalSec > 0 {
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(pcfg.UploadIntervalSec) * time.Second):
// TODO(tvalentyn): Consider a periodic cleanup as well to save local disk space.
syncProfilesToGCS(ctx, logger, pcfg.BaseTempDir, pcfg.GcsDestPath)
}
}
}()
}
}
}
if pcfg.Agent == "memray" {
go postProcessProfilesLoop(ctx, logger, pcfg.TempLocation, pcfg.PostprocessIntervalSec)
}
}
// maybeWithProfiler builds the execution arguments and environment variables if profiling is enabled and active.
func maybeWithProfiler(
ctx context.Context,
logger *tools.Logger,
workerId string,
currentProg string,
currentArgs []string,
currentEnv map[string]string,
) (string, []string, map[string]string, bool) {
pcfg := getProfilerConfig(ctx)
if pcfg == nil {
return currentProg, currentArgs, currentEnv, false
}
if _, err := os.Stat(pcfg.StopSentinelPath); err == nil {
return currentProg, currentArgs, currentEnv, false
}
prog := currentProg
var args []string
// Copy env
env := make(map[string]string)
for k, v := range currentEnv {
env[k] = v
}
if pcfg.Agent == "memray" {
timeSuffix := time.Now().Format("20060102150405")
memrayFile := filepath.Join(pcfg.TempLocation, fmt.Sprintf("memray-%s-%s.bin", workerId, timeSuffix))
args = []string{"-m", "memray", "run"}
args = append(args, pcfg.ExtraArgs...)
args = append(args, "-o", memrayFile, "-m", sdkHarnessEntrypoint)
} else if pcfg.Agent == "tcmalloc" {
tcmallocHeapPath := filepath.Join(pcfg.TempLocation, fmt.Sprintf("tcmalloc-%s", workerId))
existingPreload := os.Getenv("LD_PRELOAD")
if existingPreload != "" {
env["LD_PRELOAD"] = existingPreload + ":libtcmalloc.so.4"
} else {
env["LD_PRELOAD"] = "libtcmalloc.so.4"
}
env["HEAPPROFILE"] = tcmallocHeapPath
args = currentArgs
} else {
prog = pcfg.Agent
args = append(append([]string{}, pcfg.ExtraArgs...), currentProg)
args = append(args, currentArgs...)
}
for _, envVar := range pcfg.ExtraEnvVars {
parts := strings.SplitN(envVar, "=", 2)
if len(parts) == 2 {
env[parts[0]] = parts[1]
} else {
logger.Errorf(ctx, "Failed to parse profiler extra environment variable: %v. Expected format KEY=VALUE", envVar)
}
}
return prog, args, env, true
}
// stopProfiling creates a dummy file at StopSentinelPath to signal that profiling should stop.
func stopProfiling(ctx context.Context) error {
pcfg := getProfilerConfig(ctx)
if pcfg == nil {
return nil
}
f, err := os.Create(pcfg.StopSentinelPath)
if err == nil {
f.Close()
}
return err
}
// syncProfilesToGCS uploads newly created local memory profiles to the designated GCS target path using gcloud storage.
func syncProfilesToGCS(ctx context.Context, logger *tools.Logger, localDir, gcsDest string) {
entries, err := os.ReadDir(localDir)
if err != nil || len(entries) == 0 {
return
}
logger.Printf(ctx, "Syncing profiles from %s to %s", localDir, gcsDest)
cmd := exec.CommandContext(ctx, "gcloud", "storage", "rsync", "-r", localDir, gcsDest)
if err := cmd.Run(); err != nil {
logger.Warnf(ctx, "Failed to sync profiles to GCS: %v", err)
} else {
logger.Printf(ctx, "Successfully synced profiles to GCS.")
}
}
// postProcessProfilesLoop runs a background loop that periodically triggers profile post-processing if enabled.
func postProcessProfilesLoop(ctx context.Context, logger *tools.Logger, profilesDir string, intervalSec int) {
if intervalSec <= 0 {
return
}
for {
runPostProcessingSweep(ctx, logger, profilesDir, intervalSec)
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(intervalSec) * time.Second):
// Block until the sleep completes before starting the next sweep
}
}
}
// runPostProcessingSweep scans the profiles directory and launches sequential postprocessing for newly updated profiles.
func runPostProcessingSweep(ctx context.Context, logger *tools.Logger, profilesDir string, intervalSec int) {
files, err := os.ReadDir(profilesDir)
if err != nil {
return
}
for _, file := range files {
name := file.Name()
if !strings.HasSuffix(name, ".bin") || strings.HasPrefix(name, ".") {
continue
}
binPath := filepath.Join(profilesDir, name)
binInfo, err := os.Stat(binPath)
if err != nil || binInfo.Size() == 0 {
continue
}
peakHtml := strings.TrimSuffix(binPath, ".bin") + ".html"
leaksHtml := strings.TrimSuffix(binPath, ".bin") + "_leaks.html"
filename := filepath.Base(binPath)
peakReportStale := needsProcessing(binInfo, peakHtml)
leakReportStale := needsProcessing(binInfo, leaksHtml)
if peakReportStale || leakReportStale {
binSizeMb := float64(binInfo.Size()) / (1024 * 1024)
logger.Printf(ctx, "Post-processing profile %s of size %.2f MB", filename, binSizeMb)
}
// 1. Peak Flamegraph
if peakReportStale {
tmpPath := peakHtml + ".tmp"
cmd1 := exec.CommandContext(ctx, "python", "-m", "memray", "flamegraph", "-f", "-o", tmpPath, binPath)
if err := cmd1.Run(); err != nil {
logger.Warnf(ctx, "Failed to generate peak flamegraph for %s: %v", filename, err)
} else {
if err := os.Rename(tmpPath, peakHtml); err != nil {
logger.Warnf(ctx, "Failed to rename peak flamegraph for %s: %v", filename, err)
} else {
logger.Printf(ctx, "Successfully updated peak flamegraph for %s", filename)
_ = os.Chtimes(peakHtml, binInfo.ModTime(), binInfo.ModTime())
}
}
}
// 2. Leaks Flamegraph
if leakReportStale {
tmpPath := leaksHtml + ".tmp"
cmd2 := exec.CommandContext(ctx, "python", "-m", "memray", "flamegraph", "-f", "--leaks", "-o", tmpPath, binPath)
if err := cmd2.Run(); err != nil {
logger.Warnf(ctx, "Failed to generate leaks flamegraph for %s: %v", filename, err)
} else {
if err := os.Rename(tmpPath, leaksHtml); err != nil {
logger.Warnf(ctx, "Failed to rename leaks flamegraph for %s: %v", filename, err)
} else {
logger.Printf(ctx, "Successfully updated leaks flamegraph for %s", filename)
_ = os.Chtimes(leaksHtml, binInfo.ModTime(), binInfo.ModTime())
}
}
}
}
}
func needsProcessing(binInfo os.FileInfo, path string) bool {
info, err := os.Stat(path)
if os.IsNotExist(err) {
return true
}
if err != nil {
return true
}
// Don't regenerate when there were no updates to the profile.
return binInfo.ModTime().After(info.ModTime())
}