| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // boot is the boot code for the Java SDK harness container. It is responsible |
| // for retrieving staged files and invoking the JVM correctly. |
| package main |
| |
| import ( |
| "context" |
| "encoding/json" |
| "errors" |
| "flag" |
| "fmt" |
| "log" |
| "os" |
| "path/filepath" |
| "sort" |
| "strconv" |
| "strings" |
| |
| "github.com/apache/beam/sdks/v2/go/container/tools" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact" |
| pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/util/syscallx" |
| ) |
| |
| var ( |
| // Contract: https://s.apache.org/beam-fn-api-container-contract. |
| |
| id = flag.String("id", "", "Local identifier (required).") |
| loggingEndpoint = flag.String("logging_endpoint", "", "Logging endpoint (required).") |
| artifactEndpoint = flag.String("artifact_endpoint", "", "Artifact endpoint (required).") |
| provisionEndpoint = flag.String("provision_endpoint", "", "Provision endpoint (required).") |
| controlEndpoint = flag.String("control_endpoint", "", "Control endpoint (required).") |
| semiPersistDir = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional).") |
| ) |
| |
| const ( |
| disableJammAgentOption = "disable_jamm_agent" |
| enableGoogleCloudProfilerOption = "enable_google_cloud_profiler" |
| enableGoogleCloudHeapSamplingOption = "enable_google_cloud_heap_sampling" |
| googleCloudProfilerAgentBaseArgs = "-agentpath:/opt/google_cloud_profiler/profiler_java_agent.so=-logtostderr,-cprof_service=%s,-cprof_service_version=%s" |
| googleCloudProfilerAgentHeapArgs = googleCloudProfilerAgentBaseArgs + ",-cprof_enable_heap_sampling,-cprof_heap_sampling_interval=2097152" |
| jammAgentArgs = "-javaagent:/opt/apache/beam/jars/jamm.jar" |
| ) |
| |
| func main() { |
| flag.Parse() |
| if *id == "" { |
| log.Fatal("No id provided.") |
| } |
| if *provisionEndpoint == "" { |
| log.Fatal("No provision endpoint provided.") |
| } |
| |
| ctx := grpcx.WriteWorkerID(context.Background(), *id) |
| |
| info, err := tools.ProvisionInfo(ctx, *provisionEndpoint) |
| if err != nil { |
| log.Fatalf("Failed to obtain provisioning information: %v", err) |
| } |
| log.Printf("Provision info:\n%v", info) |
| |
| // TODO(BEAM-8201): Simplify once flags are no longer used. |
| if info.GetLoggingEndpoint().GetUrl() != "" { |
| *loggingEndpoint = info.GetLoggingEndpoint().GetUrl() |
| } |
| if info.GetArtifactEndpoint().GetUrl() != "" { |
| *artifactEndpoint = info.GetArtifactEndpoint().GetUrl() |
| } |
| if info.GetControlEndpoint().GetUrl() != "" { |
| *controlEndpoint = info.GetControlEndpoint().GetUrl() |
| } |
| |
| if *loggingEndpoint == "" { |
| log.Fatal("No logging endpoint provided.") |
| } |
| if *artifactEndpoint == "" { |
| log.Fatal("No artifact endpoint provided.") |
| } |
| if *controlEndpoint == "" { |
| log.Fatal("No control endpoint provided.") |
| } |
| logger := &tools.Logger{Endpoint: *loggingEndpoint} |
| |
| logger.Printf(ctx, "Initializing java harness: %v", strings.Join(os.Args, " ")) |
| |
| // (1) Obtain the pipeline options |
| options, err := tools.ProtoToJSON(info.GetPipelineOptions()) |
| if err != nil { |
| logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) |
| } |
| |
| // (2) Retrieve the staged user jars. We ignore any disk limit, |
| // because the staged jars are mandatory. |
| |
| // Using the SDK Harness ID in the artifact destination path to make sure that dependencies used by multiple |
| // SDK Harnesses in the same VM do not conflict. This is needed since some runners (for example, Dataflow) |
| // may share the artifact staging directory across multiple SDK Harnesses |
| // TODO(https://github.com/apache/beam/issues/20009): consider removing the SDK Harness ID from the staging path after Dataflow can properly |
| // seperate out dependencies per environment. |
| dir := filepath.Join(*semiPersistDir, *id, "staged") |
| |
| artifacts, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir) |
| if err != nil { |
| logger.Fatalf(ctx, "Failed to retrieve staged files: %v", err) |
| } |
| |
| // (3) Invoke the Java harness, preserving artifact ordering in classpath. |
| |
| os.Setenv("HARNESS_ID", *id) |
| if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil { |
| logger.Fatalf(ctx, "Failed to load pipeline options to worker: %v", err) |
| } |
| os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String()) |
| os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String()) |
| os.Setenv("RUNNER_CAPABILITIES", strings.Join(info.GetRunnerCapabilities(), " ")) |
| |
| if info.GetStatusEndpoint() != nil { |
| os.Setenv("STATUS_API_SERVICE_DESCRIPTOR", info.GetStatusEndpoint().String()) |
| } |
| |
| const jarsDir = "/opt/apache/beam/jars" |
| const javaHarnessJar = "beam-sdks-java-harness.jar" |
| defaultLoggingJars := []string{ |
| "slf4j-api.jar", |
| "slf4j-jdk14.jar", |
| "jcl-over-slf4j.jar", |
| "log4j-over-slf4j.jar", |
| "log4j-to-slf4j.jar", |
| } |
| cp := []string{} |
| if strings.Contains(options, "use_custom_logging_libraries") { |
| // In this case, the logging libraries will be provided from the staged |
| // artifacts. |
| logger.Warnf(ctx, "Skipping default slf4j dependencies in classpath") |
| } else { |
| logger.Printf(ctx, "Using default slf4j dependencies in classpath") |
| for _, jar := range defaultLoggingJars { |
| cp = append(cp, filepath.Join(jarsDir, jar)) |
| } |
| } |
| var hasWorkerExperiment = strings.Contains(options, "use_staged_dataflow_worker_jar") |
| |
| if hasWorkerExperiment { |
| // Skip adding system "beam-sdks-java-harness.jar". User-provided jar will |
| // be added to classpath as a normal user jar further below. |
| logger.Printf(ctx, "Opted to use staged java harness. Make sure beam-sdks-java-harness is included or shaded in the staged jars.") |
| } else { |
| cp = append(cp, filepath.Join(jarsDir, javaHarnessJar)) |
| } |
| |
| for _, a := range artifacts { |
| name, _ := artifact.MustExtractFilePayload(a) |
| if hasWorkerExperiment { |
| if name == "dataflow-worker.jar" { |
| continue |
| } |
| } |
| cp = append(cp, filepath.Join(dir, filepath.FromSlash(name))) |
| } |
| |
| var lim uint64 |
| if strings.Contains(options, "set_recommended_max_xmx") { |
| lim = 32 << 30 |
| } else { |
| size, err := syscallx.PhysicalMemorySize() |
| if err != nil { |
| size = 0 |
| } |
| lim = HeapSizeLimit(size) |
| } |
| |
| args := []string{ |
| "-Xmx" + strconv.FormatUint(lim, 10), |
| // ParallelGC the most adequate for high throughput and lower CPU utilization |
| // It is the default GC in Java 8, but not on newer versions |
| "-XX:+UseParallelGC", |
| "-XX:+AlwaysActAsServerClassMachine", |
| "-XX:-OmitStackTraceInFastThrow", |
| } |
| |
| enableGoogleCloudProfiler := strings.Contains(options, enableGoogleCloudProfilerOption) |
| enableGoogleCloudHeapSampling := strings.Contains(options, enableGoogleCloudHeapSamplingOption) |
| if enableGoogleCloudProfiler { |
| metadata := info.GetMetadata() |
| profilerServiceName := ExtractProfilerServiceName(options, metadata) |
| |
| if profilerServiceName != "" { |
| if jobId, idExists := metadata["job_id"]; idExists { |
| if enableGoogleCloudHeapSampling { |
| args = append(args, fmt.Sprintf(googleCloudProfilerAgentHeapArgs, profilerServiceName, jobId)) |
| } else { |
| args = append(args, fmt.Sprintf(googleCloudProfilerAgentBaseArgs, profilerServiceName, jobId)) |
| } |
| logger.Printf(ctx, "Turning on Cloud Profiling. Profile heap: %t, service: %s", enableGoogleCloudHeapSampling, profilerServiceName) |
| } else { |
| logger.Printf(ctx, "job_id is missing from metadata. Cannot enable profiling.") |
| } |
| } |
| } |
| |
| disableJammAgent := strings.Contains(options, disableJammAgentOption) |
| if disableJammAgent { |
| logger.Printf(ctx, "Disabling Jamm agent. Measuring object size will be inaccurate.") |
| } else { |
| args = append(args, jammAgentArgs) |
| } |
| |
| // If heap dumping is enabled, configure the JVM to dump it on oom events. |
| if pipelineOptions, ok := info.GetPipelineOptions().GetFields()["options"]; ok { |
| if heapDumpOption, ok := pipelineOptions.GetStructValue().GetFields()["enableHeapDumps"]; ok { |
| if heapDumpOption.GetBoolValue() { |
| args = append(args, "-XX:+HeapDumpOnOutOfMemoryError", |
| "-Dbeam.fn.heap_dump_dir="+filepath.Join(dir, "heapdumps"), |
| "-XX:HeapDumpPath="+filepath.Join(dir, "heapdumps", "heap_dump.hprof")) |
| } |
| } |
| } |
| |
| // Apply meta options |
| const metaDir = "/opt/apache/beam/options" |
| |
| metaOptions, err := LoadMetaOptions(ctx, logger, metaDir) |
| if err != nil { |
| logger.Errorf(ctx, "LoadMetaOptions failed: %v", err) |
| } |
| |
| javaOptions := BuildOptions(ctx, logger, metaOptions) |
| // (1) Add custom jvm arguments: "-server -Xmx1324 -XXfoo .." |
| args = append(args, javaOptions.JavaArguments...) |
| |
| // (2) Add classpath: "-cp foo.jar:bar.jar:.." |
| if len(javaOptions.Classpath) > 0 { |
| cp = append(cp, javaOptions.Classpath...) |
| } |
| pathingjar, err := makePathingJar(cp) |
| if err != nil { |
| logger.Fatalf(ctx, "makePathingJar failed: %v", err) |
| } |
| args = append(args, "-cp") |
| args = append(args, pathingjar) |
| |
| // (3) Add (sorted) properties: "-Dbar=baz -Dfoo=bar .." |
| var properties []string |
| for key, value := range javaOptions.Properties { |
| properties = append(properties, fmt.Sprintf("-D%s=%s", key, value)) |
| } |
| sort.Strings(properties) |
| args = append(args, properties...) |
| |
| if pipelineOptions, ok := info.GetPipelineOptions().GetFields()["options"]; ok { |
| // Open modules specified in pipeline options |
| if modules, ok := pipelineOptions.GetStructValue().GetFields()["jdkAddOpenModules"]; ok { |
| for _, module := range modules.GetListValue().GetValues() { |
| args = append(args, "--add-opens="+module.GetStringValue()) |
| } |
| } |
| // Add modules specified in pipeline options |
| if modules, ok := pipelineOptions.GetStructValue().GetFields()["jdkAddRootModules"]; ok { |
| for _, module := range modules.GetListValue().GetValues() { |
| args = append(args, "--add-modules="+module.GetStringValue()) |
| } |
| } |
| } |
| // Automatically open modules for Java 11+ |
| openModuleAgentJar := "/opt/apache/beam/jars/open-module-agent.jar" |
| if _, err := os.Stat(openModuleAgentJar); err == nil { |
| args = append(args, "-javaagent:"+openModuleAgentJar) |
| } |
| args = append(args, "org.apache.beam.fn.harness.FnHarness") |
| logger.Printf(ctx, "Executing: java %v", strings.Join(args, " ")) |
| |
| logger.Fatalf(ctx, "Java exited: %v", execx.Execute("java", args...)) |
| } |
| |
| // heapSizeLimit returns 80% of the runner limit, if provided. If not provided, |
| // it returns max(70% size, size - 32GB). Set size=0 if the physical memory on |
| // the machine was undetermined, then it returns 1GB. This is an imperfect |
| // heuristic. It aims to ensure there is memory for non-heap use and other |
| // overhead, while also not underutilizing the machine. |
| // if set_recommended_max_xmx experiment is enabled, sets xmx to 32G. Under 32G |
| // JVM enables CompressedOops. CompressedOops utilizes memory more efficiently, |
| // and has positive impact on GC performance and cache hit rate. |
| func HeapSizeLimit(size uint64) uint64 { |
| if size == 0 { |
| return 1 << 30 |
| } |
| lim := (size * 70) / 100 |
| if size-lim < 32<<30 { |
| return lim |
| } |
| return size - (32 << 30) |
| } |
| |
| // Options represents java VM invocation options in a simple, |
| // semi-structured way. |
| type Options struct { |
| JavaArguments []string `json:"java_arguments,omitempty"` |
| Properties map[string]string `json:"properties,omitempty"` |
| Classpath []string `json:"classpath,omitempty"` |
| } |
| |
| // MetaOption represents a jvm environment transformation or setup |
| // that the launcher employs. The aim is to keep the service-side and |
| // user-side required configuration simple and minimal, yet allow |
| // numerous execution tweaks. Most tweaks are enabled by default and |
| // require no input. Some setups, such as Cloud Debugging, are opt-in. |
| // |
| // Meta-options are usually included with the image and use supporting |
| // files, usually jars. A few are intrinsic because they are require |
| // additional input or complex computations, such as Cloud Debugging |
| // and Cloud Profiling. Meta-options can be enabled or disabled by |
| // name. For the most part, the meta-option names are not guaranteed |
| // to be backwards compatible or stable. They are rather knobs that |
| // can be tuned if some well-intended transformation cause trouble for |
| // a customer. For tweaks, the expectation is that the default is |
| // almost always correct. |
| // |
| // Meta-options are simple additive manipulations applied in priority |
| // order (applied low to high) to allow jvm customization by adding |
| // files, notably enabling customization by later docker layers. The |
| // override semantics is prepend for lists and simple overwrite |
| // otherwise. A common use case is adding a jar to the beginning of |
| // the classpath, such as the shuffle or windmill jni jar, or adding |
| // an agent. |
| type MetaOption struct { |
| Name string `json:"name,omitempty"` |
| Description string `json:"description,omitempty"` |
| Enabled bool `json:"enabled,omitempty"` |
| Priority int `json:"priority,omitempty"` |
| Options Options `json:"options"` |
| } |
| |
| // byPriority sorts MetaOptions by priority, highest first. |
| type byPriority []*MetaOption |
| |
| func (f byPriority) Len() int { return len(f) } |
| func (f byPriority) Swap(i, j int) { f[i], f[j] = f[j], f[i] } |
| func (f byPriority) Less(i, j int) bool { return f[i].Priority > f[j].Priority } |
| |
| // LoadMetaOptions scans the directory tree for meta-option metadata |
| // files and loads them. Any regular file named "option-XX.json" is |
| // strictly assumed to be a meta-option file. This strictness allows |
| // us to fail hard if such a file cannot be parsed. |
| // |
| // Loading meta-options from disk allows extra files and their |
| // configuration be kept together and defined externally. |
| func LoadMetaOptions(ctx context.Context, logger *tools.Logger, dir string) ([]*MetaOption, error) { |
| var meta []*MetaOption |
| |
| worker := func(path string, info os.FileInfo, err error) error { |
| if err != nil { |
| return err |
| } |
| if !info.Mode().IsRegular() { |
| return nil |
| } |
| if !strings.HasPrefix(info.Name(), "option-") { |
| return nil |
| } |
| if !strings.HasSuffix(info.Name(), ".json") { |
| return nil |
| } |
| |
| content, err := os.ReadFile(path) |
| if err != nil { |
| return err |
| } |
| |
| var option MetaOption |
| if err := json.Unmarshal(content, &option); err != nil { |
| return fmt.Errorf("failed to parse %s: %v", path, err) |
| } |
| |
| logger.Printf(ctx, "Loaded meta-option '%s'", option.Name) |
| |
| meta = append(meta, &option) |
| return nil |
| } |
| |
| if err := filepath.Walk(dir, worker); err != nil { |
| return nil, err |
| } |
| return meta, nil |
| } |
| |
| func BuildOptions(ctx context.Context, logger *tools.Logger, metaOptions []*MetaOption) *Options { |
| options := &Options{Properties: make(map[string]string)} |
| |
| sort.Sort(byPriority(metaOptions)) |
| for _, meta := range metaOptions { |
| if !meta.Enabled { |
| continue |
| } |
| |
| // Rightmost takes precedence |
| options.JavaArguments = append(meta.Options.JavaArguments, options.JavaArguments...) |
| |
| for key, value := range meta.Options.Properties { |
| _, exists := options.Properties[key] |
| if !exists { |
| options.Properties[key] = value |
| } else { |
| logger.Warnf(ctx, "Warning: %s property -D%s=%s was redefined", meta.Name, key, value) |
| } |
| } |
| |
| options.Classpath = append(options.Classpath, meta.Options.Classpath...) |
| } |
| return options |
| } |
| |
| func ExtractProfilerServiceName(options string, metadata map[string]string) string { |
| const profilerKeyPrefix = "enable_google_cloud_profiler=" |
| |
| var profilerServiceName string |
| |
| var parsed map[string]interface{} |
| if err := json.Unmarshal([]byte(options), &parsed); err != nil { |
| return "" |
| } |
| |
| displayData, ok := parsed["display_data"].([]interface{}) |
| if !ok { |
| return "" |
| } |
| |
| for _, item := range displayData { |
| entry, ok := item.(map[string]interface{}) |
| if !ok { |
| continue |
| } |
| if entry["key"] == "dataflowServiceOptions" { |
| rawValue, ok := entry["value"].(string) |
| if !ok { |
| continue |
| } |
| cleaned := strings.Trim(rawValue, "[]") |
| opts := strings.Split(cleaned, ",") |
| for _, opt := range opts { |
| opt = strings.TrimSpace(opt) |
| if strings.HasPrefix(opt, profilerKeyPrefix) { |
| parts := strings.SplitN(opt, "=", 2) |
| if len(parts) == 2 { |
| profilerServiceName = parts[1] |
| break |
| } |
| } |
| } |
| } |
| } |
| |
| // Fallback to job_name from metadata |
| if profilerServiceName == "" { |
| if jobName, exists := metadata["job_name"]; exists { |
| profilerServiceName = jobName |
| }else { |
| return errors.New("required job_name missing from metadata, profiling will not be enabled without it").Error() |
| } |
| } |
| |
| return profilerServiceName |
| } |