blob: 9813c00015e7870930a8bdae6f4f93fa9bceafc5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
var (
pip = pipLocation()
func pipLocation() string {
// Users can set 'pip' environment variable to use a custom pip path.
if v, ok := os.LookupEnv("pip"); ok {
return v
return "pip"
// pipInstallRequirements installs the given requirement, if present.
func pipInstallRequirements(files []string, dir, name string) error {
for _, file := range files {
if file == name {
// We run the install process in two rounds in order to avoid as much
// as possible PyPI downloads. In the first round the --find-links
// option will make sure that only things staged in the worker will be
// used without following their dependencies.
args := []string{"install", "-r", filepath.Join(dir, name), "--disable-pip-version-check", "--no-index", "--no-deps", "--find-links", dir}
if err := execx.Execute(pip, args...); err != nil {
return err
// The second install round opens up the search for packages on PyPI and
// also installs dependencies. The key is that if all the packages have
// been installed in the first round then this command will be a no-op.
args = []string{"install", "-r", filepath.Join(dir, name), "--disable-pip-version-check", "--find-links", dir}
return execx.Execute(pip, args...)
return nil
// pipInstallPackage installs the given package, if present.
func pipInstallPackage(files []string, dir, name string, force, optional bool, extras []string) error {
for _, file := range files {
if file == name {
var packageSpec = name
if extras != nil {
packageSpec += "[" + strings.Join(extras, ",") + "]"
if force {
// We only use force reinstallation for packages specified using the
// --extra_package flag. In this case, we always want to use the
// user-specified package, overwriting any existing package already
// installed. At the same time, we want to avoid reinstalling any
// dependencies. The "pip install" command doesn't have a clean way to do
// this, so we do this in two steps.
// First, we use the three flags "--upgrade --force-reinstall --no-deps"
// to "pip install" so as to force the package to be reinstalled, while
// avoiding reinstallation of dependencies. Note now that if any needed
// dependencies were not installed, they will still be missing.
// Next, we run "pip install" on the package without these flags. Since the
// installed version will match the package specified, the package itself
// will not be reinstalled, but its dependencies will now be resolved and
// installed if necessary. This achieves our goal outlined above.
args := []string{"install", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps",
filepath.Join(dir, packageSpec)}
err := execx.Execute(pip, args...)
if err != nil {
return err
args = []string{"install", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
return execx.Execute(pip, args...)
// Case when we do not perform a forced reinstall.
args := []string{"install", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
return execx.Execute(pip, args...)
if optional {
return nil
return errors.New("package '" + name + "' not found")
// installExtraPackages installs all the packages declared in the extra
// packages manifest file.
func installExtraPackages(files []string, extraPackagesFile, dir string) error {
// First check that extra packages manifest file is present.
for _, file := range files {
if file != extraPackagesFile {
// Found the manifest. Install extra packages.
manifest, err := ioutil.ReadFile(filepath.Join(dir, extraPackagesFile))
if err != nil {
return fmt.Errorf("failed to read extra packages manifest file: %v", err)
s := bufio.NewScanner(bytes.NewReader(manifest))
for s.Scan() {
extraPackage := s.Text()
log.Printf("Installing extra package: %s", extraPackage)
if err = pipInstallPackage(files, dir, extraPackage, true, false, nil); err != nil {
return fmt.Errorf("failed to install extra package %s: %v", extraPackage, err)
return nil
return nil
func findBeamSdkWhl(files []string, acceptableWhlSpecs []string) string {
for _, file := range files {
if strings.HasPrefix(file, "apache_beam") {
for _, s := range acceptableWhlSpecs {
if strings.HasSuffix(file, s) {
log.Printf("Found Apache Beam SDK wheel: %v", file)
return file
return ""
// InstallSdk installs Beam SDK: First, we try to find a compiled
// wheel distribution of Apache Beam among staged files. If we find it, we
// assume that the pipleine was started with the Beam SDK found in the wheel
// file, and we try to install it. If not successful, we fall back to installing
// SDK from source tarball provided in sdkSrcFile.
func installSdk(files []string, workDir string, sdkSrcFile string, acceptableWhlSpecs []string, required bool) error {
sdkWhlFile := findBeamSdkWhl(files, acceptableWhlSpecs)
if sdkWhlFile != "" {
err := pipInstallPackage(files, workDir, sdkWhlFile, false, false, []string{"gcp"})
if err == nil {
return nil
log.Printf("Could not install Apache Beam SDK from a wheel: %v, proceeding to install SDK from source tarball.", err)
if !required {
_, err := os.Stat(filepath.Join(workDir, sdkSrcFile))
if os.IsNotExist(err) {
return nil
err := pipInstallPackage(files, workDir, sdkSrcFile, false, false, []string{"gcp"})
return err