blob: c730f5188363649f04d3e1120cef534a396e7c2d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package artifact contains utilities for staging and retrieving artifacts.
package artifact
import (
"bufio"
"context"
"crypto/sha256"
"encoding/hex"
"io"
"log"
"math/rand"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
"github.com/golang/protobuf/proto"
)
// TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the pipeline_v1 proto
const (
URNFileArtifact = "beam:artifact:type:file:v1"
URNPipRequirementsFile = "beam:artifact:role:pip_requirements_file:v1"
URNStagingTo = "beam:artifact:role:staging_to:v1"
NoArtifactsStaged = "__no_artifacts_staged__"
)
// Materialize is a convenience helper for ensuring that all artifacts are
// present and uncorrupted. It interprets each artifact name as a relative
// path under the dest directory. It does not retrieve valid artifacts already
// present.
// TODO(BEAM-9577): Return a mapping of filename to dependency, rather than []*jobpb.ArtifactMetadata.
// TODO(BEAM-9577): Leverage richness of roles rather than magic names to understand artifacts.
func Materialize(ctx context.Context, endpoint string, dependencies []*pipepb.ArtifactInformation, rt string, dest string) ([]*pipepb.ArtifactInformation, error) {
if len(dependencies) > 0 {
return newMaterialize(ctx, endpoint, dependencies, dest)
} else if rt == "" || rt == NoArtifactsStaged {
return []*pipepb.ArtifactInformation{}, nil
} else {
return legacyMaterialize(ctx, endpoint, rt, dest)
}
}
func newMaterialize(ctx context.Context, endpoint string, dependencies []*pipepb.ArtifactInformation, dest string) ([]*pipepb.ArtifactInformation, error) {
cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
if err != nil {
return nil, err
}
defer cc.Close()
return newMaterializeWithClient(ctx, jobpb.NewArtifactRetrievalServiceClient(cc), dependencies, dest)
}
func newMaterializeWithClient(ctx context.Context, client jobpb.ArtifactRetrievalServiceClient, dependencies []*pipepb.ArtifactInformation, dest string) ([]*pipepb.ArtifactInformation, error) {
resolution, err := client.ResolveArtifacts(ctx, &jobpb.ResolveArtifactsRequest{Artifacts: dependencies})
if err != nil {
return nil, err
}
var artifacts []*pipepb.ArtifactInformation
var list []retrievable
for _, dep := range resolution.Replacements {
path, err := extractStagingToPath(dep)
if err != nil {
return nil, err
}
filePayload := pipepb.ArtifactFilePayload{
Path: path,
}
if dep.TypeUrn == URNFileArtifact {
typePayload := pipepb.ArtifactFilePayload{}
if err := proto.Unmarshal(dep.TypePayload, &typePayload); err != nil {
return nil, errors.Wrap(err, "failed to parse artifact file payload")
}
filePayload.Sha256 = typePayload.Sha256
}
newTypePayload, err := proto.Marshal(&filePayload)
if err != nil {
return nil, errors.Wrap(err, "failed to create artifact type payload")
}
artifacts = append(artifacts, &pipepb.ArtifactInformation{
TypeUrn: URNFileArtifact,
TypePayload: newTypePayload,
RoleUrn: dep.RoleUrn,
RolePayload: dep.RolePayload,
})
rolePayload, err := proto.Marshal(&pipepb.ArtifactStagingToRolePayload{
StagedName: path,
})
if err != nil {
return nil, errors.Wrap(err, "failed to create artifact role payload")
}
list = append(list, &artifact{
client: client,
dep: &pipepb.ArtifactInformation{
TypeUrn: dep.TypeUrn,
TypePayload: dep.TypePayload,
RoleUrn: URNStagingTo,
RolePayload: rolePayload,
},
})
}
return artifacts, MultiRetrieve(ctx, 10, list, dest)
}
// Used for generating unique IDs. We assign uniquely generated names to staged files without staging names.
var idCounter uint64
func generateId() string {
id := atomic.AddUint64(&idCounter, 1)
return strconv.FormatUint(id, 10)
}
func extractStagingToPath(artifact *pipepb.ArtifactInformation) (string, error) {
var stagedName string
if artifact.RoleUrn == URNStagingTo {
role := pipepb.ArtifactStagingToRolePayload{}
if err := proto.Unmarshal(artifact.RolePayload, &role); err != nil {
return "", err
}
stagedName = role.StagedName
} else if artifact.TypeUrn == URNFileArtifact {
ty := pipepb.ArtifactFilePayload{}
if err := proto.Unmarshal(artifact.TypePayload, &ty); err != nil {
return "", err
}
stagedName = generateId() + "-" + filepath.Base(ty.Path)
} else {
return "", errors.Errorf("failed to extract staging path for artifact type %v role %v", artifact.TypeUrn, artifact.RoleUrn)
}
return stagedName, nil
}
func MustExtractFilePayload(artifact *pipepb.ArtifactInformation) (string, string) {
if artifact.TypeUrn != URNFileArtifact {
log.Fatalf("Unsupported artifact type #{artifact.TypeUrn}")
}
ty := pipepb.ArtifactFilePayload{}
if err := proto.Unmarshal(artifact.TypePayload, &ty); err != nil {
log.Fatalf("failed to parse artifact file payload: #{err}")
}
return ty.Path, ty.Sha256
}
type artifact struct {
client jobpb.ArtifactRetrievalServiceClient
dep *pipepb.ArtifactInformation
}
func (a artifact) retrieve(ctx context.Context, dest string) error {
path, err := extractStagingToPath(a.dep)
if err != nil {
return err
}
filename := filepath.Join(dest, filepath.FromSlash(path))
_, err = os.Stat(filename)
if err == nil {
if err = os.Remove(filename); err != nil {
return errors.Errorf("failed to delete: %v (remove: %v)", filename, err)
}
} else if !os.IsNotExist(err) {
return errors.Wrapf(err, "failed to stat %v", filename)
}
if err := os.MkdirAll(filepath.Dir(filename), os.ModePerm); err != nil {
return err
}
stream, err := a.client.GetArtifact(ctx, &jobpb.GetArtifactRequest{Artifact: a.dep})
if err != nil {
return err
}
fd, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0755)
if err != nil {
return err
}
w := bufio.NewWriter(fd)
sha256Hash, err := writeChunks(stream, w)
if err != nil {
fd.Close() // drop any buffered content
return errors.Wrapf(err, "failed to retrieve chunk for %v", filename)
}
if err := w.Flush(); err != nil {
fd.Close()
return errors.Wrapf(err, "failed to flush chunks for %v", filename)
}
stat, _ := fd.Stat()
log.Printf("Downloaded: %v (sha256: %v, size: %v)", filename, sha256Hash, stat.Size())
return fd.Close()
}
func writeChunks(stream jobpb.ArtifactRetrievalService_GetArtifactClient, w io.Writer) (string, error) {
sha256W := sha256.New()
for {
chunk, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return "", err
}
if _, err := sha256W.Write(chunk.Data); err != nil {
panic(err) // cannot fail
}
if _, err := w.Write(chunk.Data); err != nil {
return "", errors.Wrapf(err, "chunk write failed")
}
}
return hex.EncodeToString(sha256W.Sum(nil)), nil
}
func legacyMaterialize(ctx context.Context, endpoint string, rt string, dest string) ([]*pipepb.ArtifactInformation, error) {
cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
if err != nil {
return nil, err
}
defer cc.Close()
client := jobpb.NewLegacyArtifactRetrievalServiceClient(cc)
m, err := client.GetManifest(ctx, &jobpb.GetManifestRequest{RetrievalToken: rt})
if err != nil {
return nil, errors.Wrap(err, "failed to get manifest")
}
mds := m.GetManifest().GetArtifact()
var artifacts []*pipepb.ArtifactInformation
var list []retrievable
for _, md := range mds {
typePayload, err := proto.Marshal(&pipepb.ArtifactFilePayload{
Path: md.Name,
Sha256: md.Sha256,
})
if err != nil {
return nil, errors.Wrap(err, "failed to create artifact type payload")
}
rolePayload, err := proto.Marshal(&pipepb.ArtifactStagingToRolePayload{
StagedName: md.Name,
})
if err != nil {
return nil, errors.Wrap(err, "failed to create artifact role payload")
}
artifacts = append(artifacts, &pipepb.ArtifactInformation{
TypeUrn: URNFileArtifact,
TypePayload: typePayload,
RoleUrn: URNStagingTo,
RolePayload: rolePayload,
})
list = append(list, &legacyArtifact{
client: client,
rt: rt,
md: md,
})
}
return artifacts, MultiRetrieve(ctx, 10, list, dest)
}
// MultiRetrieve retrieves multiple artifacts concurrently, using at most 'cpus'
// goroutines. It retries each artifact a few times. Convenience wrapper.
func MultiRetrieve(ctx context.Context, cpus int, list []retrievable, dest string) error {
if len(list) == 0 {
return nil
}
if cpus < 1 {
cpus = 1
}
if len(list) < cpus {
cpus = len(list)
}
q := slice2queue(list)
var permErr errorx.GuardedError
var wg sync.WaitGroup
for i := 0; i < cpus; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for a := range q {
if permErr.Error() != nil {
continue
}
const attempts = 3
var failures []string
for {
err := a.retrieve(ctx, dest)
if err == nil || permErr.Error() != nil {
break // done or give up
}
failures = append(failures, err.Error())
if len(failures) > attempts {
permErr.TrySetError(errors.Errorf("failed to retrieve %v in %v attempts: %v", dest, attempts, strings.Join(failures, "; ")))
break // give up
}
time.Sleep(time.Duration(rand.Intn(5)+1) * time.Second)
}
}
}()
}
wg.Wait()
return permErr.Error()
}
type retrievable interface {
retrieve(ctx context.Context, dest string) error
}
// LegacyMultiRetrieve is exported for testing.
func LegacyMultiRetrieve(ctx context.Context, client jobpb.LegacyArtifactRetrievalServiceClient, cpus int, list []*jobpb.ArtifactMetadata, rt string, dest string) error {
var rlist []retrievable
for _, md := range list {
rlist = append(rlist, &legacyArtifact{
client: client,
rt: rt,
md: md,
})
}
return MultiRetrieve(ctx, cpus, rlist, dest)
}
type legacyArtifact struct {
client jobpb.LegacyArtifactRetrievalServiceClient
rt string
md *jobpb.ArtifactMetadata
}
func (a legacyArtifact) retrieve(ctx context.Context, dest string) error {
return Retrieve(ctx, a.client, a.md, a.rt, dest)
}
// Retrieve checks whether the given artifact is already successfully
// retrieved. If not, it retrieves into the dest directory. It overwrites any
// previous retrieval attempt and may leave a corrupt/partial local file on
// failure.
func Retrieve(ctx context.Context, client jobpb.LegacyArtifactRetrievalServiceClient, a *jobpb.ArtifactMetadata, rt string, dest string) error {
filename := filepath.Join(dest, filepath.FromSlash(a.Name))
_, err := os.Stat(filename)
if err != nil && !os.IsNotExist(err) {
return errors.Wrapf(err, "failed to stat %v", filename)
}
if err == nil {
// File already exists. Validate or delete.
hash, err := computeSHA256(filename)
if err == nil && a.Sha256 == hash {
// NOTE(herohde) 10/5/2017: We ignore permissions here, because
// they may differ from the requested permissions due to umask
// settings on unix systems (which we in turn want to respect).
// We have no good way to know what to expect and thus assume
// any permissions are fine.
return nil
}
if err2 := os.Remove(filename); err2 != nil {
return errors.Errorf("failed to both validate %v and delete: %v (remove: %v)", filename, err, err2)
} // else: successfully deleted bad file.
} // else: file does not exist.
if err := os.MkdirAll(filepath.Dir(filename), 0755); err != nil {
return err
}
return retrieve(ctx, client, a, rt, filename)
}
// retrieve retrieves the given artifact and stores it as the given filename.
// It validates that the given SHA256 matches the content and fails otherwise.
// It expects the file to not exist, but does not clean up on failure and
// may leave a corrupt file.
func retrieve(ctx context.Context, client jobpb.LegacyArtifactRetrievalServiceClient, a *jobpb.ArtifactMetadata, rt string, filename string) error {
stream, err := client.GetArtifact(ctx, &jobpb.LegacyGetArtifactRequest{Name: a.Name, RetrievalToken: rt})
if err != nil {
return err
}
fd, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.FileMode(a.Permissions))
if err != nil {
return err
}
w := bufio.NewWriter(fd)
sha256Hash, err := retrieveChunks(stream, w)
if err != nil {
fd.Close() // drop any buffered content
return errors.Wrapf(err, "failed to retrieve chunk for %v", filename)
}
if err := w.Flush(); err != nil {
fd.Close()
return errors.Wrapf(err, "failed to flush chunks for %v", filename)
}
if err := fd.Close(); err != nil {
return err
}
// Artifact Sha256 hash is an optional field in metadata so we should only validate when its present.
if a.Sha256 != "" && sha256Hash != a.Sha256 {
return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.Sha256)
}
return nil
}
func retrieveChunks(stream jobpb.LegacyArtifactRetrievalService_GetArtifactClient, w io.Writer) (string, error) {
sha256W := sha256.New()
for {
chunk, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return "", err
}
if _, err := sha256W.Write(chunk.Data); err != nil {
panic(err) // cannot fail
}
if _, err := w.Write(chunk.Data); err != nil {
return "", errors.Wrapf(err, "chunk write failed")
}
}
return hex.EncodeToString(sha256W.Sum(nil)), nil
}
func computeSHA256(filename string) (string, error) {
fd, err := os.Open(filename)
if err != nil {
return "", err
}
defer fd.Close()
sha256W := sha256.New()
data := make([]byte, 1<<20)
for {
n, err := fd.Read(data)
if n > 0 {
if _, err := sha256W.Write(data[:n]); err != nil {
panic(err) // cannot fail
}
}
if err == io.EOF {
break
}
if err != nil {
return "", err
}
}
return hex.EncodeToString(sha256W.Sum(nil)), nil
}
func slice2queue(list []retrievable) chan retrievable {
q := make(chan retrievable, len(list))
for _, elm := range list {
q <- elm
}
close(q)
return q
}
func queue2slice(q chan *jobpb.ArtifactMetadata) []*jobpb.ArtifactMetadata {
var ret []*jobpb.ArtifactMetadata
for elm := range q {
ret = append(ret, elm)
}
return ret
}