blob: f7c090dc59bcb51eaa4b345deea347ef9d557791 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package jobopts contains shared options for job submission. These options
// are exposed to allow user code to inspect and modify them.
package jobopts
import (
"context"
"flag"
"fmt"
"os"
"strings"
"time"
"sync/atomic"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/go/pkg/beam/log"
)
var (
// Endpoint is the job service endpoint.
Endpoint = flag.String("endpoint", "", "Job service endpoint (required).")
// JobName is the name of the job.
JobName = flag.String("job_name", "", "Job name (optional).")
// EnvironmentType is the environment type to run the user code.
EnvironmentType = flag.String("environment_type", "DOCKER",
"Environment Type. Possible options are DOCKER and PROCESS.")
// EnvironmentConfig is the environment configuration for running the user code.
EnvironmentConfig = flag.String("environment_config",
"",
"Set environment configuration for running the user code.\n"+
"For DOCKER: Url for the docker image.\n"+
"For PROCESS: json of the form {\"os\": \"<OS>\", "+
"\"arch\": \"<ARCHITECTURE>\", \"command\": \"<process to execute>\", "+
"\"env\":{\"<Environment variables 1>\": \"<ENV_VAL>\"} }. "+
"All fields in the json are optional except command.")
// WorkerBinary is the location of the compiled worker binary. If not
// specified, the binary is produced via go build.
WorkerBinary = flag.String("worker_binary", "", "Worker binary (optional)")
// Experiments toggle experimental features in the runner.
Experiments = flag.String("experiments", "", "Comma-separated list of experiments (optional).")
// Async determines whether to wait for job completion.
Async = flag.Bool("async", false, "Do not wait for job completion.")
// Strict mode applies additional validation to user pipelines before
// executing them and fails early if the pipelines don't pass.
Strict = flag.Bool("beam_strict", false, "Apply additional validation to pipelines.")
)
// GetEndpoint returns the endpoint, if non empty and exits otherwise. Runners
// such as Dataflow set a reasonable default. Convenience function.
func GetEndpoint() (string, error) {
if *Endpoint == "" {
return "", errors.New("no job service endpoint specified. Use --endpoint=<endpoint>")
}
return *Endpoint, nil
}
var unique int32
// GetJobName returns the specified job name or, if not present, a fresh
// autogenerated name. Convenience function.
func GetJobName() string {
if *JobName == "" {
id := atomic.AddInt32(&unique, 1)
return fmt.Sprintf("go-job-%v-%v", id, time.Now().UnixNano())
}
return *JobName
}
// GetEnvironmentUrn returns the specified EnvironmentUrn used to run the SDK Harness,
// if not present, returns the docker environment urn "beam:env:docker:v1".
// Convenience function.
func GetEnvironmentUrn(ctx context.Context) string {
switch env := strings.ToLower(*EnvironmentType); env {
case "process":
return "beam:env:process:v1"
case "docker":
return "beam:env:docker:v1"
default:
log.Infof(ctx, "No environment type specified. Using default environment: '%v'", *EnvironmentType)
return "beam:env:docker:v1"
}
}
// GetEnvironmentConfig returns the specified configuration for specified SDK Harness,
// if not present, the default development container for the current user.
// Convenience function.
func GetEnvironmentConfig(ctx context.Context) string {
if *EnvironmentConfig == "" {
*EnvironmentConfig = os.ExpandEnv("apachebeam/go_sdk:latest")
log.Infof(ctx, "No environment config specified. Using default config: '%v'", *EnvironmentConfig)
}
return *EnvironmentConfig
}
// GetExperiments returns the experiments.
func GetExperiments() []string {
if *Experiments == "" {
return nil
}
return strings.Split(*Experiments, ",")
}