blob: 8157a6d94f26f643a64daf80b181ec6dea528573 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bufio"
"bytes"
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"github.com/apache/beam/sdks/go/pkg/beam/util/execx"
)
var (
pip = pipLocation()
)
func pipLocation() string {
if v, ok := os.LookupEnv("pip"); ok {
return v
}
return "/usr/local/bin/pip"
}
// pipInstallRequirements installs the given requirement, if present.
func pipInstallRequirements(files []string, dir, name string) error {
for _, file := range files {
if file == name {
// We run the install process in two rounds in order to avoid as much
// as possible PyPI downloads. In the first round the --find-links
// option will make sure that only things staged in the worker will be
// used without following their dependencies.
args := []string{"install", "-r", filepath.Join(dir, name), "--no-index", "--no-deps", "--find-links", dir}
if err := execx.Execute(pip, args...); err != nil {
return err
}
// The second install round opens up the search for packages on PyPI and
// also installs dependencies. The key is that if all the packages have
// been installed in the first round then this command will be a no-op.
args = []string{"install", "-r", filepath.Join(dir, name), "--find-links", dir}
return execx.Execute(pip, args...)
}
}
return nil
}
// pipInstallPackage installs the given package, if present.
func pipInstallPackage(files []string, dir, name string, force, optional bool, extras []string) error {
for _, file := range files {
if file == name {
var packageSpec = name
if extras != nil {
packageSpec += "[" + strings.Join(extras, ",") + "]"
}
if force {
// We only use force reinstallation for packages specified using the
// --extra_package flag. In this case, we always want to use the
// user-specified package, overwriting any existing package already
// installed. At the same time, we want to avoid reinstalling any
// dependencies. The "pip install" command doesn't have a clean way to do
// this, so we do this in two steps.
//
// First, we use the three flags "--upgrade --force-reinstall --no-deps"
// to "pip install" so as to force the package to be reinstalled, while
// avoiding reinstallation of dependencies. Note now that if any needed
// dependencies were not installed, they will still be missing.
//
// Next, we run "pip install" on the package without any flags. Since the
// installed version will match the package specified, the package itself
// will not be reinstalled, but its dependencies will now be resolved and
// installed if necessary. This achieves our goal outlined above.
args := []string{"install", "--upgrade", "--force-reinstall", "--no-deps",
filepath.Join(dir, packageSpec)}
err := execx.Execute(pip, args...)
if err != nil {
return err
}
args = []string{"install", filepath.Join(dir, packageSpec)}
return execx.Execute(pip, args...)
}
// Case when we do not perform a forced reinstall.
args := []string{"install", filepath.Join(dir, packageSpec)}
return execx.Execute(pip, args...)
}
}
if optional {
return nil
}
return errors.New("package '" + name + "' not found")
}
// installExtraPackages installs all the packages declared in the extra
// packages manifest file.
func installExtraPackages(files []string, extraPackagesFile, dir string) error {
// First check that extra packages manifest file is present.
for _, file := range files {
if file != extraPackagesFile {
continue
}
// Found the manifest. Install extra packages.
manifest, err := ioutil.ReadFile(filepath.Join(dir, extraPackagesFile))
if err != nil {
return fmt.Errorf("failed to read extra packages manifest file: %v", err)
}
s := bufio.NewScanner(bytes.NewReader(manifest))
s.Split(bufio.ScanLines)
for s.Scan() {
extraPackage := s.Text()
log.Printf("Installing extra package: %s", extraPackage)
if err = pipInstallPackage(files, dir, extraPackage, true, false, nil); err != nil {
return fmt.Errorf("failed to install extra package %s: %v", extraPackage, err)
}
}
return nil
}
return nil
}
func findBeamSdkWhl(files []string, acceptableWhlSpecs []string) string {
for _, file := range files {
if strings.HasPrefix(file, "apache_beam") {
for _, s := range acceptableWhlSpecs {
if strings.HasSuffix(file, s) {
log.Printf("Found Apache Beam SDK wheel: %v", file)
return file
}
}
}
}
return ""
}
// InstallSdk installs Beam SDK: First, we try to find a compiled
// wheel distribution of Apache Beam among staged files. If we find it, we
// assume that the pipleine was started with the Beam SDK found in the wheel
// file, and we try to install it. If not successful, we fall back to installing
// SDK from source tarball provided in sdkSrcFile.
func installSdk(files []string, workDir string, sdkSrcFile string, acceptableWhlSpecs []string, required bool) error {
sdkWhlFile := findBeamSdkWhl(files, acceptableWhlSpecs)
if sdkWhlFile != "" {
err := pipInstallPackage(files, workDir, sdkWhlFile, false, false, []string{"gcp"})
if err == nil {
return nil
}
log.Printf("Could not install Apache Beam SDK from a wheel: %v, proceeding to install SDK from source tarball.", err)
}
if !required {
_, err := os.Stat(filepath.Join(workDir, sdkSrcFile))
if os.IsNotExist(err) {
return nil
}
}
err := pipInstallPackage(files, workDir, sdkSrcFile, false, false, []string{"gcp"})
return err
}