[BEAM-9920][BEAM-11418] Go Xlang artifact staging to GCP w/ Dataflow (#14748)

Adjust the Go Dataflow runner to stage cross-language artifacts to GCP as well, and adds them to the list of packages for workers. This fixes a few failing cross-language pipelines that required artifacts to be read. Also refactors the artifact staging code slightly to allow for staging remotely in a more generic fashion, and not just to GCP.
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
index 9ea5099..1ba1ac4 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
@@ -24,53 +24,105 @@
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
 // ResolveArtifacts acquires all dependencies for a cross-language transform
 func ResolveArtifacts(ctx context.Context, edges []*graph.MultiEdge, p *pipepb.Pipeline) {
-	path, err := filepath.Abs("/tmp/artifacts")
+	_, err := ResolveArtifactsWithConfig(ctx, edges, ResolveConfig{})
 	if err != nil {
 		panic(err)
 	}
+}
+
+// ResolveConfig contains fields for configuring the behavior for resolving
+// artifacts.
+type ResolveConfig struct {
+	// SdkPath replaces the default filepath for dependencies, but only in the
+	// external environment proto to be used by the SDK Harness during pipeline
+	// execution. This is used to specify alternate staging directories, such
+	// as for staging artifacts remotely.
+	//
+	// Setting an SdkPath does not change staging behavior otherwise. All
+	// artifacts still get staged to the default local filepath, and it is the
+	// user's responsibility to stage those local artifacts to the SdkPath.
+	SdkPath string
+
+	// JoinFn is a function for combining SdkPath and individual artifact names.
+	// If not specified, it defaults to using filepath.Join.
+	JoinFn func(path, name string) string
+}
+
+func defaultJoinFn(path, name string) string {
+	return filepath.Join(path, "/", name)
+}
+
+// ResolveArtifactsWithConfig acquires all dependencies for cross-language
+// transforms, but with some additional configuration to behavior. By default,
+// this function performs the following steps for each cross-language transform
+// in the list of edges:
+//   1. Retrieves a list of dependencies needed from the expansion service.
+//   2. Retrieves each dependency as an artifact and stages it to a default
+//      local filepath.
+//   3. Adds the dependencies to the transform's stored environment proto.
+// The changes that can be configured are documented in ResolveConfig.
+//
+// This returns a map of "local path" to "sdk path". By default these are
+// identical, unless ResolveConfig.SdkPath has been set.
+func ResolveArtifactsWithConfig(ctx context.Context, edges []*graph.MultiEdge, cfg ResolveConfig) (paths map[string]string, err error) {
+	tmpPath, err := filepath.Abs("/tmp/artifacts")
+	if err != nil {
+		return nil, errors.WithContext(err, "resolving remote artifacts")
+	}
+	if cfg.JoinFn == nil {
+		cfg.JoinFn = defaultJoinFn
+	}
+	paths = make(map[string]string)
 	for _, e := range edges {
 		if e.Op == graph.External {
 			components, err := graphx.ExpandedComponents(e.External.Expanded)
 			if err != nil {
-				panic(err)
+				return nil, errors.WithContextf(err,
+					"resolving remote artifacts for edge %v", e.Name())
 			}
 			envs := components.Environments
 			for eid, env := range envs {
-
 				if strings.HasPrefix(eid, "go") {
 					continue
 				}
 				deps := env.GetDependencies()
-				resolvedArtifacts, err := artifact.Materialize(ctx, e.External.ExpansionAddr, deps, "", path)
+				resolvedArtifacts, err := artifact.Materialize(ctx, e.External.ExpansionAddr, deps, "", tmpPath)
 				if err != nil {
-					panic(err)
+					return nil, errors.WithContextf(err,
+						"resolving remote artifacts for env %v in edge %v", eid, e.Name())
 				}
 
 				var resolvedDeps []*pipepb.ArtifactInformation
 				for _, a := range resolvedArtifacts {
-					name, sha256 := artifact.MustExtractFilePayload(a)
-					fullPath := filepath.Join(path, "/", name)
+					name, _ := artifact.MustExtractFilePayload(a)
+					fullTmpPath := filepath.Join(tmpPath, "/", name)
+					fullSdkPath := fullTmpPath
+					if len(cfg.SdkPath) > 0 {
+						fullSdkPath = cfg.JoinFn(cfg.SdkPath, name)
+					}
 					resolvedDeps = append(resolvedDeps,
 						&pipepb.ArtifactInformation{
 							TypeUrn: "beam:artifact:type:file:v1",
 							TypePayload: protox.MustEncode(
 								&pipepb.ArtifactFilePayload{
-									Path:   fullPath,
-									Sha256: sha256,
+									Path: fullSdkPath,
 								},
 							),
 							RoleUrn:     a.RoleUrn,
 							RolePayload: a.RolePayload,
 						},
 					)
+					paths[fullTmpPath] = fullSdkPath
 				}
 				env.Dependencies = resolvedDeps
 			}
 		}
 	}
+	return paths, nil
 }
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index c46ff4c..cd7be52 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -178,11 +178,23 @@
 	}
 
 	// (1) Build and submit
+	// NOTE(herohde) 10/8/2018: the last segment of the names must be "worker" and "dataflow-worker.jar".
+	id := fmt.Sprintf("go-%v-%v", atomic.AddInt32(&unique, 1), time.Now().UnixNano())
+
+	modelURL := gcsx.Join(*stagingLocation, id, "model")
+	workerURL := gcsx.Join(*stagingLocation, id, "worker")
+	jarURL := gcsx.Join(*stagingLocation, id, "dataflow-worker.jar")
+	xlangURL := gcsx.Join(*stagingLocation, id, "xlang")
 
 	edges, _, err := p.Build()
 	if err != nil {
 		return nil, err
 	}
+	artifactURLs, err := dataflowlib.ResolveXLangArtifacts(ctx, edges, opts.Project, xlangURL)
+	if err != nil {
+		return nil, errors.WithContext(err, "resolving cross-language artifacts")
+	}
+	opts.ArtifactURLs = artifactURLs
 	environment, err := graphx.CreateEnvironment(ctx, jobopts.GetEnvironmentUrn(ctx), getContainerImage)
 	if err != nil {
 		return nil, errors.WithContext(err, "creating environment for model pipeline")
@@ -196,13 +208,6 @@
 		return nil, errors.WithContext(err, "applying container image overrides")
 	}
 
-	// NOTE(herohde) 10/8/2018: the last segment of the names must be "worker" and "dataflow-worker.jar".
-	id := fmt.Sprintf("go-%v-%v", atomic.AddInt32(&unique, 1), time.Now().UnixNano())
-
-	modelURL := gcsx.Join(*stagingLocation, id, "model")
-	workerURL := gcsx.Join(*stagingLocation, id, "worker")
-	jarURL := gcsx.Join(*stagingLocation, id, "dataflow-worker.jar")
-
 	if *dryRun {
 		log.Info(ctx, "Dry-run: not submitting job!")
 
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
index 511b962..390082b 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
@@ -56,6 +56,7 @@
 	WorkerRegion        string
 	WorkerZone          string
 	ContainerImage      string
+	ArtifactURLs        []string // Additional packages for workers.
 
 	// Autoscaling settings
 	Algorithm     string
@@ -128,6 +129,15 @@
 		experiments = append(experiments, "use_staged_dataflow_worker_jar")
 	}
 
+	for _, url := range opts.ArtifactURLs {
+		name := url[strings.LastIndexAny(url, "/")+1:]
+		pkg := &df.Package{
+			Name:     name,
+			Location: url,
+		}
+		packages = append(packages, pkg)
+	}
+
 	ipConfiguration := "WORKER_IP_UNSPECIFIED"
 	if opts.NoUsePublicIPs {
 		ipConfiguration = "WORKER_IP_PRIVATE"
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
index 67a2bde..49ca5bf 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
@@ -22,6 +22,8 @@
 	"os"
 
 	"cloud.google.com/go/storage"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/xlangx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
 )
@@ -54,3 +56,28 @@
 	_, err = gcsx.Upload(ctx, client, project, bucket, obj, r)
 	return err
 }
+
+// ResolveXLangArtifacts resolves cross-language artifacts with a given GCS
+// URL as a destination, and then stages all local artifacts to that URL. This
+// function returns a list of staged artifact URLs.
+func ResolveXLangArtifacts(ctx context.Context, edges []*graph.MultiEdge, project, url string) ([]string, error) {
+	cfg := xlangx.ResolveConfig{
+		SdkPath: url,
+		JoinFn: func(url, name string) string {
+			return gcsx.Join(url, "/", name)
+		},
+	}
+	paths, err := xlangx.ResolveArtifactsWithConfig(ctx, edges, cfg)
+	if err != nil {
+		return nil, err
+	}
+	var urls []string
+	for local, remote := range paths {
+		err := StageFile(ctx, project, remote, local)
+		if err != nil {
+			return nil, errors.WithContextf(err, "staging file to %v", remote)
+		}
+		urls = append(urls, remote)
+	}
+	return urls, nil
+}
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go b/sdks/go/pkg/beam/runners/universal/universal.go
index f01a71f..af14b38 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -79,6 +79,9 @@
 		getEnvCfg = srv.EnvironmentConfig
 	}
 
+	// Fetch all dependencies for cross-language transforms
+	xlangx.ResolveArtifacts(ctx, edges, nil)
+
 	environment, err := graphx.CreateEnvironment(ctx, envUrn, getEnvCfg)
 	if err != nil {
 		return nil, errors.WithContextf(err, "generating model pipeline")
@@ -88,9 +91,6 @@
 		return nil, errors.WithContextf(err, "generating model pipeline")
 	}
 
-	// Fetch all dependencies for cross-language transforms
-	xlangx.ResolveArtifacts(ctx, edges, pipeline)
-
 	log.Info(ctx, proto.MarshalTextString(pipeline))
 
 	opt := &runnerlib.JobOptions{
diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go
index c756e05..7db20ac 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -78,12 +78,6 @@
 var dataflowFilters = []string{
 	// TODO(BEAM-11576): TestFlattenDup failing on this runner.
 	"TestFlattenDup",
-	// TODO(BEAM-11418): These tests require implementing x-lang artifact
-	// staging on Dataflow.
-	"TestXLang_CoGroupBy",
-	"TestXLang_Multi",
-	"TestXLang_Partition",
-	"TestXLang_Prefix",
 }
 
 // CheckFilters checks if an integration test is filtered to be skipped, either