blob: 095450eae5796b39a5a34f8ace9f047d649ebba5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package gcsproxy contains artifact staging and retrieval servers backed by GCS.
package gcsproxy
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"hash"
"path"
"sync"
"cloud.google.com/go/storage"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
)
// StagingServer is a artifact staging server backed by Google Cloud Storage
// (GCS). It commits a single manifest and ignores the staging id.
type StagingServer struct {
manifest string
bucket, root string
blobs map[string]staged // guarded by mu
mu sync.Mutex
}
type staged struct {
object, hash string
}
// NewStagingServer creates a artifact staging server for the given manifest.
// It requires that the manifest is in GCS and will stage the supplied
// artifacts next to it.
func NewStagingServer(manifest string) (*StagingServer, error) {
bucket, object, err := gcsx.ParseObject(manifest)
if err != nil {
return nil, errors.Wrap(err, "invalid manifest location")
}
root := path.Join(path.Dir(object), "blobs")
return &StagingServer{
manifest: object,
bucket: bucket,
root: root,
blobs: make(map[string]staged),
}, nil
}
// CommitManifest commits the given artifact manifest to GCS.
func (s *StagingServer) CommitManifest(ctx context.Context, req *jobpb.CommitManifestRequest) (*jobpb.CommitManifestResponse, error) {
manifest := req.GetManifest()
s.mu.Lock()
loc, err := matchLocations(manifest.GetArtifact(), s.blobs)
if err != nil {
s.mu.Unlock()
return nil, err
}
s.mu.Unlock()
data, err := proto.Marshal(&jobpb.ProxyManifest{Manifest: manifest, Location: loc})
if err != nil {
return nil, errors.Wrap(err, "failed to marshal proxy manifest")
}
cl, err := gcsx.NewClient(ctx, storage.ScopeReadWrite)
if err != nil {
return nil, errors.Wrap(err, "failed to create GCS client")
}
if err := gcsx.WriteObject(ctx, cl, s.bucket, s.manifest, bytes.NewReader(data)); err != nil {
return nil, errors.Wrap(err, "failed to write manifest")
}
// Commit returns the location of the manifest as the token, which can
// then be used to configure the retrieval proxy. It is redundant right
// now, but would be needed for a staging server that serves multiple
// jobs. Such a server would also use the ID sent with each request.
return &jobpb.CommitManifestResponse{RetrievalToken: gcsx.MakeObject(s.bucket, s.manifest)}, nil
}
// matchLocations ensures that all artifacts have been staged and have valid
// content. It is fine for staged artifacts to not appear in the manifest.
func matchLocations(artifacts []*jobpb.ArtifactMetadata, blobs map[string]staged) ([]*jobpb.ProxyManifest_Location, error) {
var loc []*jobpb.ProxyManifest_Location
for _, a := range artifacts {
info, ok := blobs[a.Name]
if !ok {
return nil, errors.Errorf("artifact %v not staged", a.Name)
}
if a.Sha256 == "" {
a.Sha256 = info.hash
}
if info.hash != a.Sha256 {
return nil, errors.Errorf("staged artifact for %v has invalid SHA256: %v, want %v", a.Name, info.hash, a.Sha256)
}
loc = append(loc, &jobpb.ProxyManifest_Location{Name: a.Name, Uri: info.object})
}
return loc, nil
}
// PutArtifact stores the given artifact in GCS.
func (s *StagingServer) PutArtifact(ps jobpb.LegacyArtifactStagingService_PutArtifactServer) error {
// Read header
header, err := ps.Recv()
if err != nil {
return errors.Wrap(err, "failed to receive header")
}
md := header.GetMetadata().GetMetadata()
if md == nil {
return errors.Errorf("expected header as first message: %v", header)
}
object := path.Join(s.root, md.Name)
// Stream content to GCS. We don't have to worry about partial
// or abandoned writes, because object writes are atomic.
ctx := ps.Context()
cl, err := gcsx.NewClient(ctx, storage.ScopeReadWrite)
if err != nil {
return errors.Wrap(err, "failed to create GCS client")
}
r := &reader{sha256W: sha256.New(), stream: ps}
if err := gcsx.WriteObject(ctx, cl, s.bucket, object, r); err != nil {
return errors.Wrapf(err, "failed to stage artifact %v", md.Name)
}
hash := r.SHA256()
if md.Sha256 != "" && md.Sha256 != hash {
return errors.Errorf("invalid SHA256 for artifact %v: %v want %v", md.Name, hash, md.Sha256)
}
s.mu.Lock()
s.blobs[md.Name] = staged{object: gcsx.MakeObject(s.bucket, object), hash: hash}
s.mu.Unlock()
return ps.SendAndClose(&jobpb.PutArtifactResponse{})
}
// reader is an adapter between the artifact stream and the GCS stream reader.
// It also computes the SHA256 of the content.
type reader struct {
sha256W hash.Hash
buf []byte
stream jobpb.LegacyArtifactStagingService_PutArtifactServer
}
func (r *reader) Read(buf []byte) (int, error) {
if len(r.buf) == 0 {
// Buffer empty. Read from upload stream.
msg, err := r.stream.Recv()
if err != nil {
return 0, err // EOF or real error
}
r.buf = msg.GetData().GetData()
if len(r.buf) == 0 {
return 0, errors.New("empty chunk")
}
}
// Copy out bytes from non-empty buffer.
n := len(r.buf)
if n > len(buf) {
n = len(buf)
}
for i := 0; i < n; i++ {
buf[i] = r.buf[i]
}
if _, err := r.sha256W.Write(r.buf[:n]); err != nil {
panic(err) // cannot fail
}
r.buf = r.buf[n:]
return n, nil
}
func (r *reader) SHA256() string {
return hex.EncodeToString(r.sha256W.Sum(nil))
}