blob: 3b6968fad28febba83a1f7df04aa3352a0d7c7a9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Package jobopts contains shared options for job submission. These options
// are exposed to allow user code to inspect and modify them.
package jobopts
import (
var (
// Endpoint is the job service endpoint.
Endpoint = flag.String("endpoint", "", "Job service endpoint (required).")
// JobName is the name of the job.
JobName = flag.String("job_name", "", "Job name (optional).")
// EnvironmentType is the environment type to run the user code.
EnvironmentType = flag.String("environment_type", "DOCKER",
"Environment Type. Possible options are DOCKER, and LOOPBACK.")
// EnvironmentConfig is the environment configuration for running the user code.
EnvironmentConfig = flag.String("environment_config",
"Set environment configuration for running the user code.\n"+
"For DOCKER: Url for the docker image.\n"+
"For PROCESS: json of the form {\"os\": \"<OS>\", "+
"\"arch\": \"<ARCHITECTURE>\", \"command\": \"<process to execute>\", "+
"\"env\":{\"<Environment variables 1>\": \"<ENV_VAL>\"} }. "+
"All fields in the json are optional except command.")
// WorkerBinary is the location of the compiled worker binary. If not
// specified, the binary is produced via go build.
WorkerBinary = flag.String("worker_binary", "", "Worker binary (optional)")
// Experiments toggle experimental features in the runner.
Experiments = flag.String("experiments", "", "Comma-separated list of experiments (optional).")
// Async determines whether to wait for job completion.
Async = flag.Bool("async", false, "Do not wait for job completion.")
// Strict mode applies additional validation to user pipelines before
// executing them and fails early if the pipelines don't pass.
Strict = flag.Bool("beam_strict", false, "Apply additional validation to pipelines.")
// Flag to retain docker containers created by the runner. If false, then
// containers are deleted once the job ends, even if it failed.
RetainDockerContainers = flag.Bool("retain_docker_containers", false, "Retain Docker containers created by the runner.")
// GetEndpoint returns the endpoint, if non empty and exits otherwise. Runners
// such as Dataflow set a reasonable default. Convenience function.
func GetEndpoint() (string, error) {
if *Endpoint == "" {
return "", errors.New("no job service endpoint specified. Use --endpoint=<endpoint>")
return *Endpoint, nil
var unique int32
// GetJobName returns the specified job name or, if not present, a fresh
// autogenerated name. Convenience function.
func GetJobName() string {
if *JobName == "" {
id := atomic.AddInt32(&unique, 1)
return fmt.Sprintf("go-job-%v-%v", id, time.Now().UnixNano())
return *JobName
// GetEnvironmentUrn returns the specified EnvironmentUrn used to run the SDK Harness,
// if not present, returns the docker environment urn "beam:env:docker:v1".
// Convenience function.
func GetEnvironmentUrn(ctx context.Context) string {
switch env := strings.ToLower(*EnvironmentType); env {
case "process":
return "beam:env:process:v1"
case "loopback", "external":
return "beam:env:external:v1"
case "docker":
return "beam:env:docker:v1"
log.Infof(ctx, "No environment type specified. Using default environment: '%v'", *EnvironmentType)
return "beam:env:docker:v1"
// IsLoopback returns whether the EnvironmentType is loopback.
func IsLoopback() bool {
return strings.ToLower(*EnvironmentType) == "loopback"
// GetEnvironmentConfig returns the specified configuration for specified SDK Harness,
// if not present, the default development container for the current user.
// Convenience function.
func GetEnvironmentConfig(ctx context.Context) string {
if *EnvironmentConfig == "" {
*EnvironmentConfig = os.ExpandEnv("apache/beam_go_sdk:latest")
log.Infof(ctx, "No environment config specified. Using default config: '%v'", *EnvironmentConfig)
return *EnvironmentConfig
// GetExperiments returns the experiments.
func GetExperiments() []string {
if *Experiments == "" {
return nil
return strings.Split(*Experiments, ",")