blob: 847325d4f83c989d9eacfba0da48e4371138d616 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// boot is the boot code for the Python SDK harness container. It is responsible
// for retrieving and install staged files and invoking python correctly.
package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"log"
"os"
"os/exec"
"os/signal"
"path/filepath"
"regexp"
"strings"
"sync"
"syscall"
"time"
"github.com/apache/beam/sdks/v2/go/container/tools"
"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
var (
acceptableWhlSpecs []string
// SetupOnly option is used to invoke the boot sequence to only process the provided artifacts and builds new dependency pre-cached images.
setupOnly = flag.Bool("setup_only", false, "Execute boot program in setup only mode (optional).")
artifacts = flag.String("artifacts", "", "Path to artifacts metadata file used in setup only mode (optional).")
// Contract: https://s.apache.org/beam-fn-api-container-contract.
workerPool = flag.Bool("worker_pool", false, "Run as worker pool (optional).")
id = flag.String("id", "", "Local identifier (required).")
loggingEndpoint = flag.String("logging_endpoint", "", "Logging endpoint (required).")
artifactEndpoint = flag.String("artifact_endpoint", "", "Artifact endpoint (required).")
provisionEndpoint = flag.String("provision_endpoint", "", "Provision endpoint (required).")
controlEndpoint = flag.String("control_endpoint", "", "Control endpoint (required).")
semiPersistDir = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional).")
)
const (
sdkHarnessEntrypoint = "apache_beam.runners.worker.sdk_worker_main"
// Please keep these names in sync with stager.py
workflowFile = "workflow.tar.gz"
requirementsFile = "requirements.txt"
sdkSrcFile = "dataflow_python_sdk.tar"
extraPackagesFile = "extra_packages.txt"
workerPoolIdEnv = "BEAM_PYTHON_WORKER_POOL_ID"
standardArtifactFileTypeUrn = "beam:artifact:type:file:v1"
)
func main() {
flag.Parse()
if *setupOnly {
processArtifactsInSetupOnlyMode()
os.Exit(0)
}
if *workerPool {
workerPoolId := fmt.Sprintf("%d", os.Getpid())
os.Setenv(workerPoolIdEnv, workerPoolId)
args := []string{
"-m",
"apache_beam.runners.worker.worker_pool_main",
"--service_port=50000",
"--container_executable=/opt/apache/beam/boot",
}
log.Printf("Starting worker pool %v: python %v", workerPoolId, strings.Join(args, " "))
pythonVersion, err := expansionx.GetPythonVersion()
if err != nil {
log.Fatalf("Python SDK worker pool exited with error: %v", err)
}
if err := execx.Execute(pythonVersion, args...); err != nil {
log.Fatalf("Python SDK worker pool exited with error: %v", err)
}
log.Print("Python SDK worker pool exited.")
os.Exit(0)
}
if *id == "" {
log.Fatalf("No id provided.")
}
if *provisionEndpoint == "" {
log.Fatalf("No provision endpoint provided.")
}
if err := launchSDKProcess(); err != nil {
log.Fatal(err)
}
}
func launchSDKProcess() error {
ctx := grpcx.WriteWorkerID(context.Background(), *id)
info, err := tools.ProvisionInfo(ctx, *provisionEndpoint)
if err != nil {
log.Fatalf("Failed to obtain provisioning information: %v", err)
}
log.Printf("Provision info:\n%v", info)
// TODO(BEAM-8201): Simplify once flags are no longer used.
if info.GetLoggingEndpoint().GetUrl() != "" {
*loggingEndpoint = info.GetLoggingEndpoint().GetUrl()
}
if info.GetArtifactEndpoint().GetUrl() != "" {
*artifactEndpoint = info.GetArtifactEndpoint().GetUrl()
}
if info.GetControlEndpoint().GetUrl() != "" {
*controlEndpoint = info.GetControlEndpoint().GetUrl()
}
if *loggingEndpoint == "" {
log.Fatalf("No logging endpoint provided.")
}
if *artifactEndpoint == "" {
log.Fatalf("No artifact endpoint provided.")
}
if *controlEndpoint == "" {
log.Fatalf("No control endpoint provided.")
}
logger := &tools.Logger{Endpoint: *loggingEndpoint}
logger.Printf(ctx, "Initializing python harness: %v", strings.Join(os.Args, " "))
// (1) Obtain the pipeline options
options, err := tools.ProtoToJSON(info.GetPipelineOptions())
if err != nil {
logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err)
}
// (2) Retrieve and install the staged packages.
//
// No log.Fatalf() from here on, otherwise deferred cleanups will not be called!
// Trap signals, so we can clean up properly.
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
// Create a separate virtual environment (with access to globally installed packages), unless disabled by the user.
// This improves usability on runners that persist the execution environment for the boot entrypoint between multiple pipeline executions.
if os.Getenv("RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT") == "" {
venvDir, err := setupVenv(ctx, logger, "/opt/apache/beam-venv", *id)
if err != nil {
return errors.New(
"failed to create a virtual environment. If running on Ubuntu systems, " +
"you might need to install `python3-venv` package. " +
"To run the SDK process in default environment instead, " +
"set the environment variable `RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT=1`. " +
"In custom Docker images, you can do that with an `ENV` statement. " +
fmt.Sprintf("Encountered error: %v", err))
}
cleanupFunc := func() {
os.RemoveAll(venvDir)
logger.Printf(ctx, "Cleaned up temporary venv for worker %v.", *id)
}
defer cleanupFunc()
}
dir := filepath.Join(*semiPersistDir, "staged")
files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir)
if err != nil {
fmtErr := fmt.Errorf("failed to retrieve staged files: %v", err)
// Send error message to logging service before returning up the call stack
logger.Errorf(ctx, "%s", fmtErr.Error())
// No need to fail the job if submission_environment_dependencies.txt cannot be loaded
if strings.Contains(fmtErr.Error(), "submission_environment_dependencies.txt") {
logger.Printf(ctx, "Ignore the error when loading submission_environment_dependencies.txt.")
} else {
return fmtErr
}
}
// TODO(herohde): the packages to install should be specified explicitly. It
// would also be possible to install the SDK in the Dockerfile.
fileNames := make([]string, len(files))
requirementsFiles := []string{requirementsFile}
for i, v := range files {
name, _ := artifact.MustExtractFilePayload(v)
logger.Printf(ctx, "Found artifact: %s", name)
fileNames[i] = name
if v.RoleUrn == artifact.URNPipRequirementsFile {
requirementsFiles = append(requirementsFiles, name)
}
}
if setupErr := installSetupPackages(ctx, logger, fileNames, dir, requirementsFiles); setupErr != nil {
fmtErr := fmt.Errorf("failed to install required packages: %v", setupErr)
// Send error message to logging service before returning up the call stack
logger.Errorf(ctx, "%s", fmtErr.Error())
return fmtErr
}
// (3) Invoke python
os.Setenv("PIPELINE_OPTIONS", options)
os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir)
os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String())
os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String())
os.Setenv("RUNNER_CAPABILITIES", strings.Join(info.GetRunnerCapabilities(), " "))
if info.GetStatusEndpoint() != nil {
os.Setenv("STATUS_API_SERVICE_DESCRIPTOR", info.GetStatusEndpoint().String())
}
if metadata := info.GetMetadata(); metadata != nil {
if jobName, nameExists := metadata["job_name"]; nameExists {
os.Setenv("JOB_NAME", jobName)
}
if jobID, idExists := metadata["job_id"]; idExists {
os.Setenv("JOB_ID", jobID)
}
}
workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...)
// Keep track of child PIDs for clean shutdown without zombies
childPids := struct {
v []int
canceled bool
mu sync.Mutex
}{v: make([]int, 0, len(workerIds))}
// Forward trapped signals to child process groups in order to terminate them gracefully and avoid zombies
go func() {
logger.Printf(ctx, "Received signal: %v", <-signalChannel)
childPids.mu.Lock()
childPids.canceled = true
for _, pid := range childPids.v {
go func(pid int) {
// This goroutine will be canceled if the main process exits before the 5 seconds
// have elapsed, i.e., as soon as all subprocesses have returned from Wait().
time.Sleep(5 * time.Second)
if err := syscall.Kill(-pid, syscall.SIGKILL); err == nil {
logger.Warnf(ctx, "Worker process %v did not respond, killed it.", pid)
}
}(pid)
syscall.Kill(-pid, syscall.SIGTERM)
}
childPids.mu.Unlock()
}()
args := []string{
"-m",
sdkHarnessEntrypoint,
}
var wg sync.WaitGroup
wg.Add(len(workerIds))
for _, workerId := range workerIds {
go func(workerId string) {
defer wg.Done()
bufLogger := tools.NewBufferedLogger(logger)
errorCount := 0
for {
childPids.mu.Lock()
if childPids.canceled {
childPids.mu.Unlock()
return
}
logger.Printf(ctx, "Executing Python (worker %v): python %v", workerId, strings.Join(args, " "))
cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, os.Stdin, bufLogger, bufLogger, "python", args...)
childPids.v = append(childPids.v, cmd.Process.Pid)
childPids.mu.Unlock()
if err := cmd.Wait(); err != nil {
// Retry on fatal errors, like OOMs and segfaults, not just
// DoFns throwing exceptions.
errorCount += 1
bufLogger.FlushAtError(ctx)
if errorCount < 4 {
logger.Warnf(ctx, "Python (worker %v) exited %v times: %v\nrestarting SDK process",
workerId, errorCount, err)
} else {
logger.Fatalf(ctx, "Python (worker %v) exited %v times: %v\nout of retries, failing container",
workerId, errorCount, err)
}
} else {
bufLogger.FlushAtDebug(ctx)
logger.Printf(ctx, "Python (worker %v) exited.", workerId)
break
}
}
}(workerId)
}
wg.Wait()
return nil
}
// Start a command object in a new process group with the given arguments with
// additional environment variables. It attaches stdio to the child process.
// Returns the process handle.
func StartCommandEnv(env map[string]string, stdin io.Reader, stdout, stderr io.Writer, prog string, args ...string) *exec.Cmd {
cmd := exec.Command(prog, args...)
cmd.Stdin = stdin
cmd.Stdout = stdout
cmd.Stderr = stderr
if env != nil {
cmd.Env = os.Environ()
for k, v := range env {
cmd.Env = append(cmd.Env, k+"="+v)
}
}
// Create process group so we can clean up the whole subtree later without creating zombies
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0}
cmd.Start()
return cmd
}
// setupVenv initializes a local Python venv and sets the corresponding env variables
func setupVenv(ctx context.Context, logger *tools.Logger, baseDir, workerId string) (string, error) {
dir := filepath.Join(baseDir, "beam-venv-worker-"+workerId)
logger.Printf(ctx, "Initializing temporary Python venv in %v", dir)
if _, err := os.Stat(dir); !os.IsNotExist(err) {
// Probably leftovers from a previous run
logger.Printf(ctx, "Cleaning up previous venv ...")
if err := os.RemoveAll(dir); err != nil {
return "", err
}
}
if err := os.MkdirAll(dir, 0750); err != nil {
return "", fmt.Errorf("failed to create Python venv directory: %s", err)
}
pythonVersion, err := expansionx.GetPythonVersion()
if err != nil {
return "", err
}
if err := execx.Execute(pythonVersion, "-m", "venv", "--system-site-packages", dir); err != nil {
return "", fmt.Errorf("python venv initialization failed: %s", err)
}
os.Setenv("VIRTUAL_ENV", dir)
os.Setenv("PATH", strings.Join([]string{filepath.Join(dir, "bin"), os.Getenv("PATH")}, ":"))
return dir, nil
}
// setupAcceptableWheelSpecs setup wheel specs according to installed python version
func setupAcceptableWheelSpecs() error {
cmd := exec.Command("python", "-V")
stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
return err
}
re := regexp.MustCompile(`Python (\d)\.(\d+).*`)
pyVersions := re.FindStringSubmatch(string(stdoutStderr[:]))
if len(pyVersions) != 3 {
return fmt.Errorf("cannot get parse Python version from %s", stdoutStderr)
}
pyVersion := fmt.Sprintf("%s%s", pyVersions[1], pyVersions[2])
wheelName := fmt.Sprintf("cp%s-cp%s-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", pyVersion, pyVersion)
acceptableWhlSpecs = append(acceptableWhlSpecs, wheelName)
return nil
}
// installSetupPackages installs Beam SDK and user dependencies.
func installSetupPackages(ctx context.Context, logger *tools.Logger, files []string, workDir string, requirementsFiles []string) error {
bufLogger := tools.NewBufferedLogger(logger)
bufLogger.Printf(ctx, "Installing setup packages ...")
if err := setupAcceptableWheelSpecs(); err != nil {
bufLogger.Printf(ctx, "Failed to setup acceptable wheel specs, leave it as empty: %v", err)
}
// Install the Dataflow Python SDK if one was staged. In released
// container images, SDK is already installed, but can be overriden
// using the --sdk_location pipeline option.
if err := installSdk(ctx, logger, files, workDir, sdkSrcFile, acceptableWhlSpecs, false); err != nil {
return fmt.Errorf("failed to install SDK: %v", err)
}
pkgName := "apache-beam"
isSdkInstalled := isPackageInstalled(pkgName)
if !isSdkInstalled {
return fmt.Errorf("Apache Beam is not installed in the runtime environment. If you use a custom container image, you must install apache-beam package in the custom image using same version of Beam as in the pipeline submission environment. For more information, see: the https://beam.apache.org/documentation/runtime/environments/")
}
// The staged files will not disappear due to restarts because workDir is a
// folder that is mapped to the host (and therefore survives restarts).
for _, f := range requirementsFiles {
if err := pipInstallRequirements(ctx, logger, files, workDir, f); err != nil {
return fmt.Errorf("failed to install requirements: %v", err)
}
}
if err := installExtraPackages(ctx, logger, files, extraPackagesFile, workDir); err != nil {
return fmt.Errorf("failed to install extra packages: %v", err)
}
if err := pipInstallPackage(ctx, logger, files, workDir, workflowFile, false, true, nil); err != nil {
return fmt.Errorf("failed to install workflow: %v", err)
}
if err := logRuntimeDependencies(ctx, bufLogger); err != nil {
bufLogger.Printf(ctx, "couldn't fetch the runtime python dependencies: %v", err)
}
if err := logSubmissionEnvDependencies(ctx, bufLogger, workDir); err != nil {
bufLogger.Printf(ctx, "couldn't fetch the submission environment dependencies: %v", err)
}
return nil
}
// processArtifactsInSetupOnlyMode installs the dependencies found in artifacts
// when flag --setup_only and --artifacts exist. The setup mode will only
// process the provided artifacts and skip the actual worker program start up.
// The mode is useful for building new images with dependencies pre-installed so
// that the installation can be skipped at the pipeline runtime.
func processArtifactsInSetupOnlyMode() {
if *artifacts == "" {
log.Fatal("No --artifacts provided along with --setup_only flag.")
}
workDir := filepath.Dir(*artifacts)
metadata, err := os.ReadFile(*artifacts)
if err != nil {
log.Fatalf("Unable to open artifacts metadata file %v with error %v", *artifacts, err)
}
var infoJsons []string
if err := json.Unmarshal(metadata, &infoJsons); err != nil {
log.Fatalf("Unable to parse metadata, error: %v", err)
}
files := make([]string, len(infoJsons))
for i, info := range infoJsons {
var artifactInformation pipepb.ArtifactInformation
if err := protojson.Unmarshal([]byte(info), &artifactInformation); err != nil {
log.Fatalf("Unable to unmarshal artifact information from json string %v", info)
}
// For now we only expect artifacts in file type. The condition should be revisited if the assumption is not valid any more.
if artifactInformation.GetTypeUrn() != standardArtifactFileTypeUrn {
log.Fatalf("Expect file artifact type in setup only mode, found %v.", artifactInformation.GetTypeUrn())
}
filePayload := &pipepb.ArtifactFilePayload{}
if err := proto.Unmarshal(artifactInformation.GetTypePayload(), filePayload); err != nil {
log.Fatal("Unable to unmarshal artifact information type payload.")
}
files[i] = filePayload.GetPath()
}
if setupErr := installSetupPackages(context.Background(), nil, files, workDir, []string{requirementsFile}); setupErr != nil {
log.Fatalf("Failed to install required packages: %v", setupErr)
}
}
// logRuntimeDependencies logs the python dependencies
// installed in the runtime environment.
func logRuntimeDependencies(ctx context.Context, bufLogger *tools.BufferedLogger) error {
pythonVersion, err := expansionx.GetPythonVersion()
if err != nil {
return err
}
bufLogger.Printf(ctx, "Using Python version:")
args := []string{"--version"}
if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil {
bufLogger.FlushAtError(ctx)
} else {
bufLogger.FlushAtDebug(ctx)
}
bufLogger.Printf(ctx, "Logging runtime dependencies:")
args = []string{"-m", "pip", "freeze"}
if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil {
bufLogger.FlushAtError(ctx)
} else {
bufLogger.FlushAtDebug(ctx)
}
return nil
}
// logSubmissionEnvDependencies logs the python dependencies
// installed in the submission environment.
func logSubmissionEnvDependencies(ctx context.Context, bufLogger *tools.BufferedLogger, dir string) error {
bufLogger.Printf(ctx, "Logging submission environment dependencies:")
// path for submission environment dependencies should match with the
// one defined in apache_beam/runners/portability/stager.py.
filename := filepath.Join(dir, "submission_environment_dependencies.txt")
content, err := os.ReadFile(filename)
if err != nil {
return err
}
bufLogger.Printf(ctx, "%s", string(content))
return nil
}