// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// boot is the boot code for the Python SDK harness container. It is responsible
// for retrieving and install staged files and invoking python correctly.
package main
import (
pipepb ""
var (
acceptableWhlSpecs []string
// SetupOnly option is used to invoke the boot sequence to only process the provided artifacts and builds new dependency pre-cached images.
setupOnly = flag.Bool("setup_only", false, "Execute boot program in setup only mode (optional).")
artifacts = flag.String("artifacts", "", "Path to artifacts metadata file used in setup only mode (optional).")
// Contract:
workerPool = flag.Bool("worker_pool", false, "Run as worker pool (optional).")
id = flag.String("id", "", "Local identifier (required).")
loggingEndpoint = flag.String("logging_endpoint", "", "Logging endpoint (required).")
artifactEndpoint = flag.String("artifact_endpoint", "", "Artifact endpoint (required).")
provisionEndpoint = flag.String("provision_endpoint", "", "Provision endpoint (required).")
controlEndpoint = flag.String("control_endpoint", "", "Control endpoint (required).")
semiPersistDir = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional).")
const (
sdkHarnessEntrypoint = "apache_beam.runners.worker.sdk_worker_main"
// Please keep these names in sync with
workflowFile = "workflow.tar.gz"
requirementsFile = "requirements.txt"
sdkSrcFile = "dataflow_python_sdk.tar"
extraPackagesFile = "extra_packages.txt"
standardArtifactFileTypeUrn = "beam:artifact:type:file:v1"
func main() {
if *setupOnly {
if err := processArtifactsInSetupOnlyMode(); err != nil {
log.Fatalf("Setup unsuccessful with error: %v", err)
if *workerPool == true {
workerPoolId := fmt.Sprintf("%d", os.Getpid())
os.Setenv(workerPoolIdEnv, workerPoolId)
args := []string{
log.Printf("Starting worker pool %v: python %v", workerPoolId, strings.Join(args, " "))
log.Fatalf("Python SDK worker pool exited: %v", execx.Execute("python", args...))
if *id == "" {
log.Fatal("No id provided.")
if *provisionEndpoint == "" {
log.Fatal("No provision endpoint provided.")
ctx := grpcx.WriteWorkerID(context.Background(), *id)
info, err := provision.Info(ctx, *provisionEndpoint)
if err != nil {
log.Fatalf("Failed to obtain provisioning information: %v", err)
log.Printf("Provision info:\n%v", info)
// TODO(BEAM-8201): Simplify once flags are no longer used.
if info.GetLoggingEndpoint().GetUrl() != "" {
*loggingEndpoint = info.GetLoggingEndpoint().GetUrl()
if info.GetArtifactEndpoint().GetUrl() != "" {
*artifactEndpoint = info.GetArtifactEndpoint().GetUrl()
if info.GetControlEndpoint().GetUrl() != "" {
*controlEndpoint = info.GetControlEndpoint().GetUrl()
if *loggingEndpoint == "" {
log.Fatal("No logging endpoint provided.")
if *artifactEndpoint == "" {
log.Fatal("No artifact endpoint provided.")
if *controlEndpoint == "" {
log.Fatal("No control endpoint provided.")
log.Printf("Initializing python harness: %v", strings.Join(os.Args, " "))
// (1) Obtain the pipeline options
options, err := provision.ProtoToJSON(info.GetPipelineOptions())
if err != nil {
log.Fatalf("Failed to convert pipeline options: %v", err)
// (2) Retrieve and install the staged packages.
// Guard from concurrent artifact retrieval and installation,
// when called by child processes in a worker pool.
materializeArtifactsFunc := func() {
dir := filepath.Join(*semiPersistDir, "staged")
files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir)
if err != nil {
log.Fatalf("Failed to retrieve staged files: %v", err)
// TODO(herohde): the packages to install should be specified explicitly. It
// would also be possible to install the SDK in the Dockerfile.
fileNames := make([]string, len(files))
requirementsFiles := []string{requirementsFile}
for i, v := range files {
name, _ := artifact.MustExtractFilePayload(v)
log.Printf("Found artifact: %s", name)
fileNames[i] = name
if v.RoleUrn == artifact.URNPipRequirementsFile {
requirementsFiles = append(requirementsFiles, name)
if setupErr := installSetupPackages(fileNames, dir, requirementsFiles); setupErr != nil {
log.Fatalf("Failed to install required packages: %v", setupErr)
workerPoolId := os.Getenv(workerPoolIdEnv)
if workerPoolId != "" {
multiProcessExactlyOnce(materializeArtifactsFunc, "beam.install.complete."+workerPoolId)
} else {
// (3) Invoke python
os.Setenv("WORKER_ID", *id)
os.Setenv("PIPELINE_OPTIONS", options)
os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir)
os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", proto.MarshalTextString(&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}))
os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", proto.MarshalTextString(&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}))
os.Setenv("RUNNER_CAPABILITIES", strings.Join(info.GetRunnerCapabilities(), " "))
if info.GetStatusEndpoint() != nil {
os.Setenv("STATUS_API_SERVICE_DESCRIPTOR", proto.MarshalTextString(info.GetStatusEndpoint()))
if metadata := info.GetMetadata(); metadata != nil {
if jobName, nameExists := metadata["job_name"]; nameExists {
os.Setenv("JOB_NAME", jobName)
if jobID, idExists := metadata["job_id"]; idExists {
os.Setenv("JOB_ID", jobID)
args := []string{
log.Printf("Executing: python %v", strings.Join(args, " "))
log.Fatalf("Python exited: %v", execx.Execute("python", args...))
// setup wheel specs according to installed python version
func setupAcceptableWheelSpecs() error {
cmd := exec.Command("python", "-V")
stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
return err
re := regexp.MustCompile(`Python (\d)\.(\d).*`)
pyVersions := re.FindStringSubmatch(string(stdoutStderr[:]))
if len(pyVersions) != 3 {
return fmt.Errorf("cannot get parse Python version from %s", stdoutStderr)
pyVersion := fmt.Sprintf("%s%s", pyVersions[1], pyVersions[2])
var wheelName string
switch pyVersion {
case "36", "37":
wheelName = fmt.Sprintf("cp%s-cp%sm-manylinux1_x86_64.whl", pyVersion, pyVersion)
wheelName = fmt.Sprintf("cp%s-cp%s-manylinux1_x86_64.whl", pyVersion, pyVersion)
acceptableWhlSpecs = append(acceptableWhlSpecs, wheelName)
return nil
// installSetupPackages installs Beam SDK and user dependencies.
func installSetupPackages(files []string, workDir string, requirementsFiles []string) error {
log.Printf("Installing setup packages ...")
if err := setupAcceptableWheelSpecs(); err != nil {
log.Printf("Failed to setup acceptable wheel specs, leave it as empty: %v", err)
// Install the Dataflow Python SDK and worker packages.
// We install the extra requirements in case of using the beam sdk. These are ignored by pip
// if the user is using an SDK that does not provide these.
if err := installSdk(files, workDir, sdkSrcFile, acceptableWhlSpecs, false); err != nil {
return fmt.Errorf("failed to install SDK: %v", err)
// The staged files will not disappear due to restarts because workDir is a
// folder that is mapped to the host (and therefore survives restarts).
for _, f := range requirementsFiles {
if err := pipInstallRequirements(files, workDir, f); err != nil {
return fmt.Errorf("failed to install requirements: %v", err)
if err := installExtraPackages(files, extraPackagesFile, workDir); err != nil {
return fmt.Errorf("failed to install extra packages: %v", err)
if err := pipInstallPackage(files, workDir, workflowFile, false, true, nil); err != nil {
return fmt.Errorf("failed to install workflow: %v", err)
return nil
// joinPaths joins the dir to every artifact path. Each / in the path is
// interpreted as a directory separator.
func joinPaths(dir string, paths ...string) []string {
var ret []string
for _, p := range paths {
ret = append(ret, filepath.Join(dir, filepath.FromSlash(p)))
return ret
// Call the given function exactly once across multiple worker processes.
// The need for multiple processes is specific to the Python SDK due to the GIL.
// Should another SDK require it, this could be separated out as shared utility.
func multiProcessExactlyOnce(actionFunc func(), completeFileName string) {
installCompleteFile := filepath.Join(os.TempDir(), completeFileName)
// skip if install already complete, no need to lock
_, err := os.Stat(installCompleteFile)
if err == nil {
lock, err := lockfile.New(filepath.Join(os.TempDir(), completeFileName+".lck"))
if err != nil {
log.Fatalf("Cannot init artifact retrieval lock: %v", err)
for err = lock.TryLock(); err != nil; err = lock.TryLock() {
if _, ok := err.(lockfile.TemporaryError); ok {
time.Sleep(5 * time.Second)
log.Printf("Worker %v waiting for artifact retrieval lock: %v", *id, lock)
} else {
log.Fatalf("Worker %v could not obtain artifact retrieval lock: %v", *id, err)
defer lock.Unlock()
// skip if install already complete
_, err = os.Stat(installCompleteFile)
if err == nil {
// do the real work
// mark install complete
os.OpenFile(installCompleteFile, os.O_RDONLY|os.O_CREATE, 0666)
// processArtifactsInSetupOnlyMode installs the dependencies found in artifacts
// when flag --setup_only and --artifacts exist. The setup mode will only
// process the provided artifacts and skip the actual worker program start up.
// The mode is useful for building new images with dependencies pre-installed so
// that the installation can be skipped at the pipeline runtime.
func processArtifactsInSetupOnlyMode() error {
if *artifacts == "" {
log.Fatal("No --artifacts provided along with --setup_only flag.")
workDir := filepath.Dir(*artifacts)
metadata, err := ioutil.ReadFile(*artifacts)
if err != nil {
log.Fatalf("Unable to open artifacts metadata file %v with error %v", *artifacts, err)
var infoJsons []string
if err := json.Unmarshal(metadata, &infoJsons); err != nil {
log.Fatalf("Unable to parse metadata, error: %v", err)
files := make([]string, len(infoJsons))
for i, info := range infoJsons {
var artifactInformation pipepb.ArtifactInformation
if err := jsonpb.UnmarshalString(info, &artifactInformation); err != nil {
log.Fatalf("Unable to unmarshal artifact information from json string %v", info)
// For now we only expect artifacts in file type. The condition should be revisited if the assumption is not valid any more.
if artifactInformation.GetTypeUrn() != standardArtifactFileTypeUrn {
log.Fatalf("Expect file artifact type in setup only mode, found %v.", artifactInformation.GetTypeUrn())
filePayload := &pipepb.ArtifactFilePayload{}
if err := proto.Unmarshal(artifactInformation.GetTypePayload(), filePayload); err != nil {
log.Fatal("Unable to unmarshal artifact information type payload.")
files[i] = filePayload.GetPath()
if setupErr := installSetupPackages(files, workDir, []string{requirementsFile}); setupErr != nil {
log.Fatalf("Failed to install required packages: %v", setupErr)
return nil