blob: 1ba1ac4120cafa6254b7026b5da504fb00e98af3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package xlangx
import (
"context"
"path/filepath"
"strings"
"github.com/apache/beam/sdks/go/pkg/beam/artifact"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
)
// ResolveArtifacts acquires all dependencies for a cross-language transform
func ResolveArtifacts(ctx context.Context, edges []*graph.MultiEdge, p *pipepb.Pipeline) {
_, err := ResolveArtifactsWithConfig(ctx, edges, ResolveConfig{})
if err != nil {
panic(err)
}
}
// ResolveConfig contains fields for configuring the behavior for resolving
// artifacts.
type ResolveConfig struct {
// SdkPath replaces the default filepath for dependencies, but only in the
// external environment proto to be used by the SDK Harness during pipeline
// execution. This is used to specify alternate staging directories, such
// as for staging artifacts remotely.
//
// Setting an SdkPath does not change staging behavior otherwise. All
// artifacts still get staged to the default local filepath, and it is the
// user's responsibility to stage those local artifacts to the SdkPath.
SdkPath string
// JoinFn is a function for combining SdkPath and individual artifact names.
// If not specified, it defaults to using filepath.Join.
JoinFn func(path, name string) string
}
func defaultJoinFn(path, name string) string {
return filepath.Join(path, "/", name)
}
// ResolveArtifactsWithConfig acquires all dependencies for cross-language
// transforms, but with some additional configuration to behavior. By default,
// this function performs the following steps for each cross-language transform
// in the list of edges:
// 1. Retrieves a list of dependencies needed from the expansion service.
// 2. Retrieves each dependency as an artifact and stages it to a default
// local filepath.
// 3. Adds the dependencies to the transform's stored environment proto.
// The changes that can be configured are documented in ResolveConfig.
//
// This returns a map of "local path" to "sdk path". By default these are
// identical, unless ResolveConfig.SdkPath has been set.
func ResolveArtifactsWithConfig(ctx context.Context, edges []*graph.MultiEdge, cfg ResolveConfig) (paths map[string]string, err error) {
tmpPath, err := filepath.Abs("/tmp/artifacts")
if err != nil {
return nil, errors.WithContext(err, "resolving remote artifacts")
}
if cfg.JoinFn == nil {
cfg.JoinFn = defaultJoinFn
}
paths = make(map[string]string)
for _, e := range edges {
if e.Op == graph.External {
components, err := graphx.ExpandedComponents(e.External.Expanded)
if err != nil {
return nil, errors.WithContextf(err,
"resolving remote artifacts for edge %v", e.Name())
}
envs := components.Environments
for eid, env := range envs {
if strings.HasPrefix(eid, "go") {
continue
}
deps := env.GetDependencies()
resolvedArtifacts, err := artifact.Materialize(ctx, e.External.ExpansionAddr, deps, "", tmpPath)
if err != nil {
return nil, errors.WithContextf(err,
"resolving remote artifacts for env %v in edge %v", eid, e.Name())
}
var resolvedDeps []*pipepb.ArtifactInformation
for _, a := range resolvedArtifacts {
name, _ := artifact.MustExtractFilePayload(a)
fullTmpPath := filepath.Join(tmpPath, "/", name)
fullSdkPath := fullTmpPath
if len(cfg.SdkPath) > 0 {
fullSdkPath = cfg.JoinFn(cfg.SdkPath, name)
}
resolvedDeps = append(resolvedDeps,
&pipepb.ArtifactInformation{
TypeUrn: "beam:artifact:type:file:v1",
TypePayload: protox.MustEncode(
&pipepb.ArtifactFilePayload{
Path: fullSdkPath,
},
),
RoleUrn: a.RoleUrn,
RolePayload: a.RolePayload,
},
)
paths[fullTmpPath] = fullSdkPath
}
env.Dependencies = resolvedDeps
}
}
}
return paths, nil
}