[BEAM-9920][BEAM-11418] Go Xlang artifact staging to GCP w/ Dataflow (#14748)
Adjust the Go Dataflow runner to stage cross-language artifacts to GCP as well, and adds them to the list of packages for workers. This fixes a few failing cross-language pipelines that required artifacts to be read. Also refactors the artifact staging code slightly to allow for staging remotely in a more generic fashion, and not just to GCP.
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
index 9ea5099..1ba1ac4 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
@@ -24,53 +24,105 @@
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+ "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
)
// ResolveArtifacts acquires all dependencies for a cross-language transform
func ResolveArtifacts(ctx context.Context, edges []*graph.MultiEdge, p *pipepb.Pipeline) {
- path, err := filepath.Abs("/tmp/artifacts")
+ _, err := ResolveArtifactsWithConfig(ctx, edges, ResolveConfig{})
if err != nil {
panic(err)
}
+}
+
+// ResolveConfig contains fields for configuring the behavior for resolving
+// artifacts.
+type ResolveConfig struct {
+ // SdkPath replaces the default filepath for dependencies, but only in the
+ // external environment proto to be used by the SDK Harness during pipeline
+ // execution. This is used to specify alternate staging directories, such
+ // as for staging artifacts remotely.
+ //
+ // Setting an SdkPath does not change staging behavior otherwise. All
+ // artifacts still get staged to the default local filepath, and it is the
+ // user's responsibility to stage those local artifacts to the SdkPath.
+ SdkPath string
+
+ // JoinFn is a function for combining SdkPath and individual artifact names.
+ // If not specified, it defaults to using filepath.Join.
+ JoinFn func(path, name string) string
+}
+
+func defaultJoinFn(path, name string) string {
+ return filepath.Join(path, "/", name)
+}
+
+// ResolveArtifactsWithConfig acquires all dependencies for cross-language
+// transforms, but with some additional configuration to behavior. By default,
+// this function performs the following steps for each cross-language transform
+// in the list of edges:
+// 1. Retrieves a list of dependencies needed from the expansion service.
+// 2. Retrieves each dependency as an artifact and stages it to a default
+// local filepath.
+// 3. Adds the dependencies to the transform's stored environment proto.
+// The changes that can be configured are documented in ResolveConfig.
+//
+// This returns a map of "local path" to "sdk path". By default these are
+// identical, unless ResolveConfig.SdkPath has been set.
+func ResolveArtifactsWithConfig(ctx context.Context, edges []*graph.MultiEdge, cfg ResolveConfig) (paths map[string]string, err error) {
+ tmpPath, err := filepath.Abs("/tmp/artifacts")
+ if err != nil {
+ return nil, errors.WithContext(err, "resolving remote artifacts")
+ }
+ if cfg.JoinFn == nil {
+ cfg.JoinFn = defaultJoinFn
+ }
+ paths = make(map[string]string)
for _, e := range edges {
if e.Op == graph.External {
components, err := graphx.ExpandedComponents(e.External.Expanded)
if err != nil {
- panic(err)
+ return nil, errors.WithContextf(err,
+ "resolving remote artifacts for edge %v", e.Name())
}
envs := components.Environments
for eid, env := range envs {
-
if strings.HasPrefix(eid, "go") {
continue
}
deps := env.GetDependencies()
- resolvedArtifacts, err := artifact.Materialize(ctx, e.External.ExpansionAddr, deps, "", path)
+ resolvedArtifacts, err := artifact.Materialize(ctx, e.External.ExpansionAddr, deps, "", tmpPath)
if err != nil {
- panic(err)
+ return nil, errors.WithContextf(err,
+ "resolving remote artifacts for env %v in edge %v", eid, e.Name())
}
var resolvedDeps []*pipepb.ArtifactInformation
for _, a := range resolvedArtifacts {
- name, sha256 := artifact.MustExtractFilePayload(a)
- fullPath := filepath.Join(path, "/", name)
+ name, _ := artifact.MustExtractFilePayload(a)
+ fullTmpPath := filepath.Join(tmpPath, "/", name)
+ fullSdkPath := fullTmpPath
+ if len(cfg.SdkPath) > 0 {
+ fullSdkPath = cfg.JoinFn(cfg.SdkPath, name)
+ }
resolvedDeps = append(resolvedDeps,
&pipepb.ArtifactInformation{
TypeUrn: "beam:artifact:type:file:v1",
TypePayload: protox.MustEncode(
&pipepb.ArtifactFilePayload{
- Path: fullPath,
- Sha256: sha256,
+ Path: fullSdkPath,
},
),
RoleUrn: a.RoleUrn,
RolePayload: a.RolePayload,
},
)
+ paths[fullTmpPath] = fullSdkPath
}
env.Dependencies = resolvedDeps
}
}
}
+ return paths, nil
}
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index c46ff4c..cd7be52 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -178,11 +178,23 @@
}
// (1) Build and submit
+ // NOTE(herohde) 10/8/2018: the last segment of the names must be "worker" and "dataflow-worker.jar".
+ id := fmt.Sprintf("go-%v-%v", atomic.AddInt32(&unique, 1), time.Now().UnixNano())
+
+ modelURL := gcsx.Join(*stagingLocation, id, "model")
+ workerURL := gcsx.Join(*stagingLocation, id, "worker")
+ jarURL := gcsx.Join(*stagingLocation, id, "dataflow-worker.jar")
+ xlangURL := gcsx.Join(*stagingLocation, id, "xlang")
edges, _, err := p.Build()
if err != nil {
return nil, err
}
+ artifactURLs, err := dataflowlib.ResolveXLangArtifacts(ctx, edges, opts.Project, xlangURL)
+ if err != nil {
+ return nil, errors.WithContext(err, "resolving cross-language artifacts")
+ }
+ opts.ArtifactURLs = artifactURLs
environment, err := graphx.CreateEnvironment(ctx, jobopts.GetEnvironmentUrn(ctx), getContainerImage)
if err != nil {
return nil, errors.WithContext(err, "creating environment for model pipeline")
@@ -196,13 +208,6 @@
return nil, errors.WithContext(err, "applying container image overrides")
}
- // NOTE(herohde) 10/8/2018: the last segment of the names must be "worker" and "dataflow-worker.jar".
- id := fmt.Sprintf("go-%v-%v", atomic.AddInt32(&unique, 1), time.Now().UnixNano())
-
- modelURL := gcsx.Join(*stagingLocation, id, "model")
- workerURL := gcsx.Join(*stagingLocation, id, "worker")
- jarURL := gcsx.Join(*stagingLocation, id, "dataflow-worker.jar")
-
if *dryRun {
log.Info(ctx, "Dry-run: not submitting job!")
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
index 511b962..390082b 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
@@ -56,6 +56,7 @@
WorkerRegion string
WorkerZone string
ContainerImage string
+ ArtifactURLs []string // Additional packages for workers.
// Autoscaling settings
Algorithm string
@@ -128,6 +129,15 @@
experiments = append(experiments, "use_staged_dataflow_worker_jar")
}
+ for _, url := range opts.ArtifactURLs {
+ name := url[strings.LastIndexAny(url, "/")+1:]
+ pkg := &df.Package{
+ Name: name,
+ Location: url,
+ }
+ packages = append(packages, pkg)
+ }
+
ipConfiguration := "WORKER_IP_UNSPECIFIED"
if opts.NoUsePublicIPs {
ipConfiguration = "WORKER_IP_PRIVATE"
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
index 67a2bde..49ca5bf 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
@@ -22,6 +22,8 @@
"os"
"cloud.google.com/go/storage"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/xlangx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
)
@@ -54,3 +56,28 @@
_, err = gcsx.Upload(ctx, client, project, bucket, obj, r)
return err
}
+
+// ResolveXLangArtifacts resolves cross-language artifacts with a given GCS
+// URL as a destination, and then stages all local artifacts to that URL. This
+// function returns a list of staged artifact URLs.
+func ResolveXLangArtifacts(ctx context.Context, edges []*graph.MultiEdge, project, url string) ([]string, error) {
+ cfg := xlangx.ResolveConfig{
+ SdkPath: url,
+ JoinFn: func(url, name string) string {
+ return gcsx.Join(url, "/", name)
+ },
+ }
+ paths, err := xlangx.ResolveArtifactsWithConfig(ctx, edges, cfg)
+ if err != nil {
+ return nil, err
+ }
+ var urls []string
+ for local, remote := range paths {
+ err := StageFile(ctx, project, remote, local)
+ if err != nil {
+ return nil, errors.WithContextf(err, "staging file to %v", remote)
+ }
+ urls = append(urls, remote)
+ }
+ return urls, nil
+}
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go b/sdks/go/pkg/beam/runners/universal/universal.go
index f01a71f..af14b38 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -79,6 +79,9 @@
getEnvCfg = srv.EnvironmentConfig
}
+ // Fetch all dependencies for cross-language transforms
+ xlangx.ResolveArtifacts(ctx, edges, nil)
+
environment, err := graphx.CreateEnvironment(ctx, envUrn, getEnvCfg)
if err != nil {
return nil, errors.WithContextf(err, "generating model pipeline")
@@ -88,9 +91,6 @@
return nil, errors.WithContextf(err, "generating model pipeline")
}
- // Fetch all dependencies for cross-language transforms
- xlangx.ResolveArtifacts(ctx, edges, pipeline)
-
log.Info(ctx, proto.MarshalTextString(pipeline))
opt := &runnerlib.JobOptions{
diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go
index c756e05..7db20ac 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -78,12 +78,6 @@
var dataflowFilters = []string{
// TODO(BEAM-11576): TestFlattenDup failing on this runner.
"TestFlattenDup",
- // TODO(BEAM-11418): These tests require implementing x-lang artifact
- // staging on Dataflow.
- "TestXLang_CoGroupBy",
- "TestXLang_Multi",
- "TestXLang_Partition",
- "TestXLang_Prefix",
}
// CheckFilters checks if an integration test is filtered to be skipped, either