| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package main |
| |
| import ( |
| "context" |
| "encoding/json" |
| "errors" |
| "flag" |
| "fmt" |
| "io" |
| "log" |
| "os" |
| "path/filepath" |
| "strings" |
| "time" |
| |
| "github.com/apache/beam/sdks/v2/go/container/pool" |
| "github.com/apache/beam/sdks/v2/go/container/tools" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime" |
| |
| // Import gcs filesystem so that it can be used to upload heap dumps |
| _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs" |
| fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" |
| pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/util/diagnostics" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" |
| ) |
| |
| var ( |
| // Contract: https://s.apache.org/beam-fn-api-container-contract. |
| |
| workerPool = flag.Bool("worker_pool", false, "Run as worker pool (optional).") |
| id = flag.String("id", "", "Local identifier (required).") |
| loggingEndpoint = flag.String("logging_endpoint", "", "Local logging endpoint for FnHarness (required).") |
| artifactEndpoint = flag.String("artifact_endpoint", "", "Local artifact endpoint for FnHarness (required).") |
| provisionEndpoint = flag.String("provision_endpoint", "", "Local provision endpoint for FnHarness (required).") |
| controlEndpoint = flag.String("control_endpoint", "", "Local control endpoint for FnHarness (required).") |
| semiPersistDir = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional).") |
| ) |
| |
| const ( |
| cloudProfilingJobName = "CLOUD_PROF_JOB_NAME" |
| cloudProfilingJobID = "CLOUD_PROF_JOB_ID" |
| enableGoogleCloudProfilerOption = "enable_google_cloud_profiler" |
| workerPoolIdEnv = "BEAM_GO_WORKER_POOL_ID" |
| ) |
| |
| func configureGoogleCloudProfilerEnvVars(ctx context.Context, logger *tools.Logger, metadata map[string]string, options string) error { |
| const profilerKey = "enable_google_cloud_profiler=" |
| |
| var parsed map[string]interface{} |
| if err := json.Unmarshal([]byte(options), &parsed); err != nil { |
| panic(err) |
| } |
| |
| var profilerServiceName string |
| |
| // Try from "beam:option:go_options:v1" -> "options" -> "dataflow_service_options" |
| if goOpts, ok := parsed["beam:option:go_options:v1"].(map[string]interface{}); ok { |
| if options, ok := goOpts["options"].(map[string]interface{}); ok { |
| if profilerServiceNameRaw, ok := options["dataflow_service_options"].(string); ok { |
| if strings.HasPrefix(profilerServiceNameRaw, profilerKey) { |
| profilerServiceName = strings.TrimPrefix(profilerServiceNameRaw, profilerKey) |
| } |
| } |
| } |
| } |
| |
| // Fallback to job_name from metadata |
| if profilerServiceName == "" { |
| if jobName, jobNameExists := metadata["job_name"]; jobNameExists { |
| profilerServiceName = jobName |
| } else { |
| return errors.New("required job_name missing from metadata, profiling will not be enabled without it") |
| } |
| } |
| |
| jobID, idExists := metadata["job_id"] |
| if !idExists { |
| return errors.New("required job_id missing from metadata, profiling will not be enabled without it") |
| } |
| |
| os.Setenv(cloudProfilingJobName, profilerServiceName) |
| os.Setenv(cloudProfilingJobID, jobID) |
| logger.Printf(ctx, "Cloud Profiling Job Name: %v, Job IDL %v", profilerServiceName, jobID) |
| return nil |
| |
| } |
| |
| func main() { |
| flag.Parse() |
| |
| if *workerPool { |
| workerPoolId := fmt.Sprintf("%d", os.Getpid()) |
| bin, err := os.Executable() |
| if err != nil { |
| log.Fatalf("Error starting worker pool, couldn't find boot loader path: %v", err) |
| } |
| |
| os.Setenv(workerPoolIdEnv, workerPoolId) |
| log.Printf("Starting worker pool %v: Go %v binary: %vv", workerPoolId, ":50000", bin) |
| |
| ctx := context.Background() |
| server, err := pool.New(ctx, 50000, bin) |
| if err != nil { |
| log.Fatalf("Error starting worker pool: %v", err) |
| } |
| defer server.Stop(ctx) |
| if err := server.ServeAndWait(); err != nil { |
| log.Fatalf("Error with worker pool: %v", err) |
| } |
| log.Print("Go SDK worker pool exited.") |
| os.Exit(0) |
| } |
| |
| if *id == "" { |
| log.Fatal("No id provided.") |
| } |
| if *provisionEndpoint == "" { |
| log.Fatal("No provision endpoint provided.") |
| } |
| |
| ctx := grpcx.WriteWorkerID(context.Background(), *id) |
| |
| info, err := tools.ProvisionInfo(ctx, *provisionEndpoint) |
| if err != nil { |
| log.Fatalf("Failed to obtain provisioning information: %v", err) |
| } |
| log.Printf("Provision info:\n%v", info) |
| |
| err = ensureEndpointsSet(info) |
| if err != nil { |
| log.Fatalf("Endpoint not set: %v", err) |
| } |
| logger := &tools.Logger{Endpoint: *loggingEndpoint} |
| logger.Printf(ctx, "Initializing Go harness: %v", strings.Join(os.Args, " ")) |
| |
| // (1) Obtain the pipeline options |
| |
| options, err := tools.ProtoToJSON(info.GetPipelineOptions()) |
| if err != nil { |
| logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) |
| } |
| |
| // (2) Retrieve the staged files. |
| // |
| // The Go SDK harness downloads the worker binary and invokes |
| // it. The binary is required to be keyed as "worker", if there |
| // are more than one artifact. |
| |
| dir := filepath.Join(*semiPersistDir, "staged") |
| artifacts, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir) |
| if err != nil { |
| logger.Fatalf(ctx, "Failed to retrieve staged files: %v", err) |
| } |
| |
| name, err := getGoWorkerArtifactName(ctx, logger, artifacts) |
| if err != nil { |
| logger.Fatalf(ctx, "Failed to get Go Worker Artifact Name: %v", err) |
| } |
| |
| // (3) The persist dir may be on a noexec volume, so we must |
| // copy the binary to a different location to execute. |
| tmpPrefix, err := os.MkdirTemp("/tmp/", "bin*") |
| if err != nil { |
| logger.Fatalf(ctx, "Failed to copy worker binary: %v", err) |
| } |
| |
| prog := tmpPrefix + "/worker" |
| logger.Printf(ctx, "From: %q To:%q", filepath.Join(dir, name), prog) |
| if err := copyExe(filepath.Join(dir, name), prog); err != nil { |
| logger.Fatalf(ctx, "Failed to copy worker binary: %v", err) |
| } |
| |
| args := []string{ |
| "--worker=true", |
| "--id=" + *id, |
| "--logging_endpoint=" + *loggingEndpoint, |
| "--control_endpoint=" + *controlEndpoint, |
| "--semi_persist_dir=" + *semiPersistDir, |
| } |
| if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil { |
| logger.Fatalf(ctx, "Failed to load pipeline options to worker: %v", err) |
| } |
| if info.GetStatusEndpoint() != nil { |
| os.Setenv("STATUS_ENDPOINT", info.GetStatusEndpoint().GetUrl()) |
| } |
| |
| if len(info.GetRunnerCapabilities()) > 0 { |
| os.Setenv("RUNNER_CAPABILITIES", strings.Join(info.GetRunnerCapabilities(), " ")) |
| } |
| |
| enableGoogleCloudProfiler := strings.Contains(options, enableGoogleCloudProfilerOption) |
| if enableGoogleCloudProfiler { |
| err := configureGoogleCloudProfilerEnvVars(ctx, logger, info.Metadata, options) |
| if err != nil { |
| logger.Printf(ctx, "could not configure Google Cloud Profiler variables, got %v", err) |
| } |
| } |
| |
| err = execx.Execute(prog, args...) |
| |
| if err != nil { |
| var opt runtime.RawOptionsWrapper |
| err := json.Unmarshal([]byte(options), &opt) |
| if err == nil { |
| if tempLocation, ok := opt.Options.Options["temp_location"]; ok { |
| diagnostics.UploadHeapProfile(ctx, fmt.Sprintf("%v/heapProfiles/profile-%v-%d", strings.TrimSuffix(tempLocation, "/"), *id, time.Now().Unix())) |
| } |
| } |
| } |
| |
| logger.Fatalf(ctx, "User program exited: %v", err) |
| } |
| |
| func getGoWorkerArtifactName(ctx context.Context, logger *tools.Logger, artifacts []*pipepb.ArtifactInformation) (string, error) { |
| const worker = "worker" |
| name := worker |
| |
| switch len(artifacts) { |
| case 0: |
| return "", errors.New("no artifacts staged") |
| case 1: |
| name, _ = artifact.MustExtractFilePayload(artifacts[0]) |
| return name, nil |
| default: |
| for _, a := range artifacts { |
| if a.GetRoleUrn() == artifact.URNGoWorkerBinaryRole { |
| name, _ = artifact.MustExtractFilePayload(a) |
| return name, nil |
| } |
| } |
| // TODO(https://github.com/apache/beam/issues/21459): Remove legacy hack once aged out. |
| for _, a := range artifacts { |
| n, _ := artifact.MustExtractFilePayload(a) |
| if n == worker { |
| logger.Printf(ctx, "Go worker binary found with legacy name '%v'", worker) |
| return n, nil |
| } |
| } |
| return "", fmt.Errorf("no artifact named '%v' found", worker) |
| } |
| } |
| |
| func ensureEndpointsSet(info *fnpb.ProvisionInfo) error { |
| // TODO(BEAM-8201): Simplify once flags are no longer used. |
| if info.GetLoggingEndpoint().GetUrl() != "" { |
| *loggingEndpoint = info.GetLoggingEndpoint().GetUrl() |
| } |
| if info.GetArtifactEndpoint().GetUrl() != "" { |
| *artifactEndpoint = info.GetArtifactEndpoint().GetUrl() |
| } |
| if info.GetControlEndpoint().GetUrl() != "" { |
| *controlEndpoint = info.GetControlEndpoint().GetUrl() |
| } |
| |
| if *loggingEndpoint == "" { |
| return errors.New("no logging endpoint provided") |
| } |
| if *artifactEndpoint == "" { |
| return errors.New("no artifact endpoint provided") |
| } |
| if *controlEndpoint == "" { |
| return errors.New("no control endpoint provided") |
| } |
| |
| return nil |
| } |
| |
| func copyExe(from, to string) error { |
| src, err := os.Open(from) |
| if err != nil { |
| return err |
| } |
| defer src.Close() |
| |
| // Ensure that the folder path exists locally. |
| if err := os.MkdirAll(filepath.Dir(to), 0755); err != nil { |
| return err |
| } |
| |
| dst, err := os.OpenFile(to, os.O_WRONLY|os.O_CREATE, 0755) |
| if err != nil { |
| return err |
| } |
| |
| if _, err := io.Copy(dst, src); err != nil { |
| return err |
| } |
| return dst.Close() |
| } |