| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // boot is the boot code for the Python SDK harness container. It is responsible |
| // for retrieving and install staged files and invoking python correctly. |
| package main |
| |
| import ( |
| "context" |
| "encoding/json" |
| "errors" |
| "flag" |
| "fmt" |
| "io" |
| "log" |
| "os" |
| "os/exec" |
| "os/signal" |
| "path/filepath" |
| "regexp" |
| "strings" |
| "sync" |
| "syscall" |
| "time" |
| |
| "github.com/apache/beam/sdks/v2/go/container/tools" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx" |
| pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" |
| "google.golang.org/protobuf/encoding/protojson" |
| "google.golang.org/protobuf/proto" |
| ) |
| |
| var ( |
| acceptableWhlSpecs []string |
| |
| // SetupOnly option is used to invoke the boot sequence to only process the provided artifacts and builds new dependency pre-cached images. |
| setupOnly = flag.Bool("setup_only", false, "Execute boot program in setup only mode (optional).") |
| artifacts = flag.String("artifacts", "", "Path to artifacts metadata file used in setup only mode (optional).") |
| |
| // Contract: https://s.apache.org/beam-fn-api-container-contract. |
| |
| workerPool = flag.Bool("worker_pool", false, "Run as worker pool (optional).") |
| id = flag.String("id", "", "Local identifier (required).") |
| loggingEndpoint = flag.String("logging_endpoint", "", "Logging endpoint (required).") |
| artifactEndpoint = flag.String("artifact_endpoint", "", "Artifact endpoint (required).") |
| provisionEndpoint = flag.String("provision_endpoint", "", "Provision endpoint (required).") |
| controlEndpoint = flag.String("control_endpoint", "", "Control endpoint (required).") |
| semiPersistDir = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional).") |
| ) |
| |
| const ( |
| sdkHarnessEntrypoint = "apache_beam.runners.worker.sdk_worker_main" |
| // Please keep these names in sync with stager.py |
| workflowFile = "workflow.tar.gz" |
| requirementsFile = "requirements.txt" |
| sdkSrcFile = "dataflow_python_sdk.tar" |
| extraPackagesFile = "extra_packages.txt" |
| workerPoolIdEnv = "BEAM_PYTHON_WORKER_POOL_ID" |
| |
| standardArtifactFileTypeUrn = "beam:artifact:type:file:v1" |
| ) |
| |
| func main() { |
| flag.Parse() |
| |
| if *setupOnly { |
| processArtifactsInSetupOnlyMode() |
| os.Exit(0) |
| } |
| |
| if *workerPool { |
| workerPoolId := fmt.Sprintf("%d", os.Getpid()) |
| os.Setenv(workerPoolIdEnv, workerPoolId) |
| args := []string{ |
| "-m", |
| "apache_beam.runners.worker.worker_pool_main", |
| "--service_port=50000", |
| "--container_executable=/opt/apache/beam/boot", |
| } |
| log.Printf("Starting worker pool %v: python %v", workerPoolId, strings.Join(args, " ")) |
| pythonVersion, err := expansionx.GetPythonVersion() |
| if err != nil { |
| log.Fatalf("Python SDK worker pool exited with error: %v", err) |
| } |
| if err := execx.Execute(pythonVersion, args...); err != nil { |
| log.Fatalf("Python SDK worker pool exited with error: %v", err) |
| } |
| log.Print("Python SDK worker pool exited.") |
| os.Exit(0) |
| } |
| |
| if *id == "" { |
| log.Fatalf("No id provided.") |
| } |
| if *provisionEndpoint == "" { |
| log.Fatalf("No provision endpoint provided.") |
| } |
| |
| if err := launchSDKProcess(); err != nil { |
| log.Fatal(err) |
| } |
| } |
| |
| func launchSDKProcess() error { |
| ctx := grpcx.WriteWorkerID(context.Background(), *id) |
| |
| info, err := tools.ProvisionInfo(ctx, *provisionEndpoint) |
| if err != nil { |
| log.Fatalf("Failed to obtain provisioning information: %v", err) |
| } |
| log.Printf("Provision info:\n%v", info) |
| |
| // TODO(BEAM-8201): Simplify once flags are no longer used. |
| if info.GetLoggingEndpoint().GetUrl() != "" { |
| *loggingEndpoint = info.GetLoggingEndpoint().GetUrl() |
| } |
| if info.GetArtifactEndpoint().GetUrl() != "" { |
| *artifactEndpoint = info.GetArtifactEndpoint().GetUrl() |
| } |
| if info.GetControlEndpoint().GetUrl() != "" { |
| *controlEndpoint = info.GetControlEndpoint().GetUrl() |
| } |
| |
| if *loggingEndpoint == "" { |
| log.Fatalf("No logging endpoint provided.") |
| } |
| if *artifactEndpoint == "" { |
| log.Fatalf("No artifact endpoint provided.") |
| } |
| if *controlEndpoint == "" { |
| log.Fatalf("No control endpoint provided.") |
| } |
| logger := &tools.Logger{Endpoint: *loggingEndpoint} |
| logger.Printf(ctx, "Initializing python harness: %v", strings.Join(os.Args, " ")) |
| |
| // (1) Obtain the pipeline options |
| |
| options, err := tools.ProtoToJSON(info.GetPipelineOptions()) |
| if err != nil { |
| logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) |
| } |
| |
| // (2) Retrieve and install the staged packages. |
| // |
| // No log.Fatalf() from here on, otherwise deferred cleanups will not be called! |
| |
| // Trap signals, so we can clean up properly. |
| signalChannel := make(chan os.Signal, 1) |
| signal.Notify(signalChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) |
| |
| // Create a separate virtual environment (with access to globally installed packages), unless disabled by the user. |
| // This improves usability on runners that persist the execution environment for the boot entrypoint between multiple pipeline executions. |
| if os.Getenv("RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT") == "" { |
| venvDir, err := setupVenv(ctx, logger, "/opt/apache/beam-venv", *id) |
| if err != nil { |
| return errors.New( |
| "failed to create a virtual environment. If running on Ubuntu systems, " + |
| "you might need to install `python3-venv` package. " + |
| "To run the SDK process in default environment instead, " + |
| "set the environment variable `RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT=1`. " + |
| "In custom Docker images, you can do that with an `ENV` statement. " + |
| fmt.Sprintf("Encountered error: %v", err)) |
| } |
| cleanupFunc := func() { |
| os.RemoveAll(venvDir) |
| logger.Printf(ctx, "Cleaned up temporary venv for worker %v.", *id) |
| } |
| defer cleanupFunc() |
| } |
| |
| dir := filepath.Join(*semiPersistDir, "staged") |
| files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir) |
| if err != nil { |
| fmtErr := fmt.Errorf("failed to retrieve staged files: %v", err) |
| // Send error message to logging service before returning up the call stack |
| logger.Errorf(ctx, "%s", fmtErr.Error()) |
| // No need to fail the job if submission_environment_dependencies.txt cannot be loaded |
| if strings.Contains(fmtErr.Error(), "submission_environment_dependencies.txt") { |
| logger.Printf(ctx, "Ignore the error when loading submission_environment_dependencies.txt.") |
| } else { |
| return fmtErr |
| } |
| } |
| |
| // TODO(herohde): the packages to install should be specified explicitly. It |
| // would also be possible to install the SDK in the Dockerfile. |
| fileNames := make([]string, len(files)) |
| requirementsFiles := []string{requirementsFile} |
| for i, v := range files { |
| name, _ := artifact.MustExtractFilePayload(v) |
| logger.Printf(ctx, "Found artifact: %s", name) |
| fileNames[i] = name |
| |
| if v.RoleUrn == artifact.URNPipRequirementsFile { |
| requirementsFiles = append(requirementsFiles, name) |
| } |
| } |
| |
| if setupErr := installSetupPackages(ctx, logger, fileNames, dir, requirementsFiles); setupErr != nil { |
| fmtErr := fmt.Errorf("failed to install required packages: %v", setupErr) |
| // Send error message to logging service before returning up the call stack |
| logger.Errorf(ctx, "%s", fmtErr.Error()) |
| return fmtErr |
| } |
| |
| // (3) Invoke python |
| |
| os.Setenv("PIPELINE_OPTIONS", options) |
| os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir) |
| os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String()) |
| os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String()) |
| os.Setenv("RUNNER_CAPABILITIES", strings.Join(info.GetRunnerCapabilities(), " ")) |
| |
| if info.GetStatusEndpoint() != nil { |
| os.Setenv("STATUS_API_SERVICE_DESCRIPTOR", info.GetStatusEndpoint().String()) |
| } |
| |
| if metadata := info.GetMetadata(); metadata != nil { |
| if jobName, nameExists := metadata["job_name"]; nameExists { |
| os.Setenv("JOB_NAME", jobName) |
| } |
| if jobID, idExists := metadata["job_id"]; idExists { |
| os.Setenv("JOB_ID", jobID) |
| } |
| } |
| |
| workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...) |
| |
| // Keep track of child PIDs for clean shutdown without zombies |
| childPids := struct { |
| v []int |
| canceled bool |
| mu sync.Mutex |
| }{v: make([]int, 0, len(workerIds))} |
| |
| // Forward trapped signals to child process groups in order to terminate them gracefully and avoid zombies |
| go func() { |
| logger.Printf(ctx, "Received signal: %v", <-signalChannel) |
| childPids.mu.Lock() |
| childPids.canceled = true |
| for _, pid := range childPids.v { |
| go func(pid int) { |
| // This goroutine will be canceled if the main process exits before the 5 seconds |
| // have elapsed, i.e., as soon as all subprocesses have returned from Wait(). |
| time.Sleep(5 * time.Second) |
| if err := syscall.Kill(-pid, syscall.SIGKILL); err == nil { |
| logger.Warnf(ctx, "Worker process %v did not respond, killed it.", pid) |
| } |
| }(pid) |
| syscall.Kill(-pid, syscall.SIGTERM) |
| } |
| childPids.mu.Unlock() |
| }() |
| |
| args := []string{ |
| "-m", |
| sdkHarnessEntrypoint, |
| } |
| |
| var wg sync.WaitGroup |
| wg.Add(len(workerIds)) |
| for _, workerId := range workerIds { |
| go func(workerId string) { |
| defer wg.Done() |
| |
| bufLogger := tools.NewBufferedLogger(logger) |
| errorCount := 0 |
| for { |
| childPids.mu.Lock() |
| if childPids.canceled { |
| childPids.mu.Unlock() |
| return |
| } |
| logger.Printf(ctx, "Executing Python (worker %v): python %v", workerId, strings.Join(args, " ")) |
| cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, os.Stdin, bufLogger, bufLogger, "python", args...) |
| childPids.v = append(childPids.v, cmd.Process.Pid) |
| childPids.mu.Unlock() |
| |
| if err := cmd.Wait(); err != nil { |
| // Retry on fatal errors, like OOMs and segfaults, not just |
| // DoFns throwing exceptions. |
| errorCount += 1 |
| bufLogger.FlushAtError(ctx) |
| if errorCount < 4 { |
| logger.Warnf(ctx, "Python (worker %v) exited %v times: %v\nrestarting SDK process", |
| workerId, errorCount, err) |
| } else { |
| logger.Fatalf(ctx, "Python (worker %v) exited %v times: %v\nout of retries, failing container", |
| workerId, errorCount, err) |
| } |
| } else { |
| bufLogger.FlushAtDebug(ctx) |
| logger.Printf(ctx, "Python (worker %v) exited.", workerId) |
| break |
| } |
| } |
| }(workerId) |
| } |
| wg.Wait() |
| return nil |
| } |
| |
| // Start a command object in a new process group with the given arguments with |
| // additional environment variables. It attaches stdio to the child process. |
| // Returns the process handle. |
| func StartCommandEnv(env map[string]string, stdin io.Reader, stdout, stderr io.Writer, prog string, args ...string) *exec.Cmd { |
| cmd := exec.Command(prog, args...) |
| cmd.Stdin = stdin |
| cmd.Stdout = stdout |
| cmd.Stderr = stderr |
| if env != nil { |
| cmd.Env = os.Environ() |
| for k, v := range env { |
| cmd.Env = append(cmd.Env, k+"="+v) |
| } |
| } |
| |
| // Create process group so we can clean up the whole subtree later without creating zombies |
| cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0} |
| cmd.Start() |
| return cmd |
| } |
| |
| // setupVenv initializes a local Python venv and sets the corresponding env variables |
| func setupVenv(ctx context.Context, logger *tools.Logger, baseDir, workerId string) (string, error) { |
| dir := filepath.Join(baseDir, "beam-venv-worker-"+workerId) |
| logger.Printf(ctx, "Initializing temporary Python venv in %v", dir) |
| if _, err := os.Stat(dir); !os.IsNotExist(err) { |
| // Probably leftovers from a previous run |
| logger.Printf(ctx, "Cleaning up previous venv ...") |
| if err := os.RemoveAll(dir); err != nil { |
| return "", err |
| } |
| } |
| if err := os.MkdirAll(dir, 0750); err != nil { |
| return "", fmt.Errorf("failed to create Python venv directory: %s", err) |
| } |
| pythonVersion, err := expansionx.GetPythonVersion() |
| if err != nil { |
| return "", err |
| } |
| if err := execx.Execute(pythonVersion, "-m", "venv", "--system-site-packages", dir); err != nil { |
| return "", fmt.Errorf("python venv initialization failed: %s", err) |
| } |
| |
| os.Setenv("VIRTUAL_ENV", dir) |
| os.Setenv("PATH", strings.Join([]string{filepath.Join(dir, "bin"), os.Getenv("PATH")}, ":")) |
| return dir, nil |
| } |
| |
| // setupAcceptableWheelSpecs setup wheel specs according to installed python version |
| func setupAcceptableWheelSpecs() error { |
| cmd := exec.Command("python", "-V") |
| stdoutStderr, err := cmd.CombinedOutput() |
| if err != nil { |
| return err |
| } |
| re := regexp.MustCompile(`Python (\d)\.(\d+).*`) |
| pyVersions := re.FindStringSubmatch(string(stdoutStderr[:])) |
| if len(pyVersions) != 3 { |
| return fmt.Errorf("cannot get parse Python version from %s", stdoutStderr) |
| } |
| pyVersion := fmt.Sprintf("%s%s", pyVersions[1], pyVersions[2]) |
| wheelName := fmt.Sprintf("cp%s-cp%s-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", pyVersion, pyVersion) |
| acceptableWhlSpecs = append(acceptableWhlSpecs, wheelName) |
| return nil |
| } |
| |
| // installSetupPackages installs Beam SDK and user dependencies. |
| func installSetupPackages(ctx context.Context, logger *tools.Logger, files []string, workDir string, requirementsFiles []string) error { |
| bufLogger := tools.NewBufferedLogger(logger) |
| bufLogger.Printf(ctx, "Installing setup packages ...") |
| |
| if err := setupAcceptableWheelSpecs(); err != nil { |
| bufLogger.Printf(ctx, "Failed to setup acceptable wheel specs, leave it as empty: %v", err) |
| } |
| |
| // Install the Dataflow Python SDK if one was staged. In released |
| // container images, SDK is already installed, but can be overriden |
| // using the --sdk_location pipeline option. |
| if err := installSdk(ctx, logger, files, workDir, sdkSrcFile, acceptableWhlSpecs, false); err != nil { |
| return fmt.Errorf("failed to install SDK: %v", err) |
| } |
| pkgName := "apache-beam" |
| isSdkInstalled := isPackageInstalled(pkgName) |
| if !isSdkInstalled { |
| return fmt.Errorf("Apache Beam is not installed in the runtime environment. If you use a custom container image, you must install apache-beam package in the custom image using same version of Beam as in the pipeline submission environment. For more information, see: the https://beam.apache.org/documentation/runtime/environments/") |
| } |
| // The staged files will not disappear due to restarts because workDir is a |
| // folder that is mapped to the host (and therefore survives restarts). |
| for _, f := range requirementsFiles { |
| if err := pipInstallRequirements(ctx, logger, files, workDir, f); err != nil { |
| return fmt.Errorf("failed to install requirements: %v", err) |
| } |
| } |
| if err := installExtraPackages(ctx, logger, files, extraPackagesFile, workDir); err != nil { |
| return fmt.Errorf("failed to install extra packages: %v", err) |
| } |
| if err := pipInstallPackage(ctx, logger, files, workDir, workflowFile, false, true, nil); err != nil { |
| return fmt.Errorf("failed to install workflow: %v", err) |
| } |
| if err := logRuntimeDependencies(ctx, bufLogger); err != nil { |
| bufLogger.Printf(ctx, "couldn't fetch the runtime python dependencies: %v", err) |
| } |
| if err := logSubmissionEnvDependencies(ctx, bufLogger, workDir); err != nil { |
| bufLogger.Printf(ctx, "couldn't fetch the submission environment dependencies: %v", err) |
| } |
| |
| return nil |
| } |
| |
| // processArtifactsInSetupOnlyMode installs the dependencies found in artifacts |
| // when flag --setup_only and --artifacts exist. The setup mode will only |
| // process the provided artifacts and skip the actual worker program start up. |
| // The mode is useful for building new images with dependencies pre-installed so |
| // that the installation can be skipped at the pipeline runtime. |
| func processArtifactsInSetupOnlyMode() { |
| if *artifacts == "" { |
| log.Fatal("No --artifacts provided along with --setup_only flag.") |
| } |
| workDir := filepath.Dir(*artifacts) |
| metadata, err := os.ReadFile(*artifacts) |
| if err != nil { |
| log.Fatalf("Unable to open artifacts metadata file %v with error %v", *artifacts, err) |
| } |
| var infoJsons []string |
| if err := json.Unmarshal(metadata, &infoJsons); err != nil { |
| log.Fatalf("Unable to parse metadata, error: %v", err) |
| } |
| |
| files := make([]string, len(infoJsons)) |
| for i, info := range infoJsons { |
| var artifactInformation pipepb.ArtifactInformation |
| if err := protojson.Unmarshal([]byte(info), &artifactInformation); err != nil { |
| log.Fatalf("Unable to unmarshal artifact information from json string %v", info) |
| } |
| |
| // For now we only expect artifacts in file type. The condition should be revisited if the assumption is not valid any more. |
| if artifactInformation.GetTypeUrn() != standardArtifactFileTypeUrn { |
| log.Fatalf("Expect file artifact type in setup only mode, found %v.", artifactInformation.GetTypeUrn()) |
| } |
| filePayload := &pipepb.ArtifactFilePayload{} |
| if err := proto.Unmarshal(artifactInformation.GetTypePayload(), filePayload); err != nil { |
| log.Fatal("Unable to unmarshal artifact information type payload.") |
| } |
| files[i] = filePayload.GetPath() |
| } |
| if setupErr := installSetupPackages(context.Background(), nil, files, workDir, []string{requirementsFile}); setupErr != nil { |
| log.Fatalf("Failed to install required packages: %v", setupErr) |
| } |
| } |
| |
| // logRuntimeDependencies logs the python dependencies |
| // installed in the runtime environment. |
| func logRuntimeDependencies(ctx context.Context, bufLogger *tools.BufferedLogger) error { |
| pythonVersion, err := expansionx.GetPythonVersion() |
| if err != nil { |
| return err |
| } |
| bufLogger.Printf(ctx, "Using Python version:") |
| args := []string{"--version"} |
| if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil { |
| bufLogger.FlushAtError(ctx) |
| } else { |
| bufLogger.FlushAtDebug(ctx) |
| } |
| bufLogger.Printf(ctx, "Logging runtime dependencies:") |
| args = []string{"-m", "pip", "freeze"} |
| if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil { |
| bufLogger.FlushAtError(ctx) |
| } else { |
| bufLogger.FlushAtDebug(ctx) |
| } |
| return nil |
| } |
| |
| // logSubmissionEnvDependencies logs the python dependencies |
| // installed in the submission environment. |
| func logSubmissionEnvDependencies(ctx context.Context, bufLogger *tools.BufferedLogger, dir string) error { |
| bufLogger.Printf(ctx, "Logging submission environment dependencies:") |
| // path for submission environment dependencies should match with the |
| // one defined in apache_beam/runners/portability/stager.py. |
| filename := filepath.Join(dir, "submission_environment_dependencies.txt") |
| content, err := os.ReadFile(filename) |
| if err != nil { |
| return err |
| } |
| bufLogger.Printf(ctx, "%s", string(content)) |
| return nil |
| } |