| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // Package dataflow contains the Dataflow runner for submitting pipelines |
| // to Google Cloud Dataflow. |
| package dataflow |
| |
| import ( |
| "context" |
| "encoding/json" |
| "flag" |
| "fmt" |
| "io" |
| "path" |
| "strings" |
| "sync/atomic" |
| "time" |
| |
| "cloud.google.com/go/storage" |
| "github.com/apache/beam/sdks/go/pkg/beam" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks" |
| "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" |
| "github.com/apache/beam/sdks/go/pkg/beam/log" |
| "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts" |
| "github.com/apache/beam/sdks/go/pkg/beam/options/jobopts" |
| "github.com/apache/beam/sdks/go/pkg/beam/runners/dataflow/dataflowlib" |
| "github.com/apache/beam/sdks/go/pkg/beam/util/gcsx" |
| "github.com/apache/beam/sdks/go/pkg/beam/x/hooks/perf" |
| "github.com/golang/protobuf/proto" |
| ) |
| |
| // TODO(herohde) 5/16/2017: the Dataflow flags should match the other SDKs. |
| |
| var ( |
| endpoint = flag.String("dataflow_endpoint", "", "Dataflow endpoint (optional).") |
| stagingLocation = flag.String("staging_location", "", "GCS staging location (required).") |
| image = flag.String("worker_harness_container_image", "", "Worker harness container image (required).") |
| labels = flag.String("labels", "", "JSON-formatted map[string]string of job labels (optional).") |
| serviceAccountEmail = flag.String("service_account_email", "", "Service account email (optional).") |
| numWorkers = flag.Int64("num_workers", 0, "Number of workers (optional).") |
| maxNumWorkers = flag.Int64("max_num_workers", 0, "Maximum number of workers during scaling (optional).") |
| diskSizeGb = flag.Int64("disk_size_gb", 0, "Size of root disk for VMs, in GB (optional).") |
| autoscalingAlgorithm = flag.String("autoscaling_algorithm", "", "Autoscaling mode to use (optional).") |
| zone = flag.String("zone", "", "GCP zone (optional)") |
| network = flag.String("network", "", "GCP network (optional)") |
| subnetwork = flag.String("subnetwork", "", "GCP subnetwork (optional)") |
| noUsePublicIPs = flag.Bool("no_use_public_ips", false, "Workers must not use public IP addresses (optional)") |
| tempLocation = flag.String("temp_location", "", "Temp location (optional)") |
| machineType = flag.String("worker_machine_type", "", "GCE machine type (optional)") |
| minCPUPlatform = flag.String("min_cpu_platform", "", "GCE minimum cpu platform (optional)") |
| workerJar = flag.String("dataflow_worker_jar", "", "Dataflow worker jar (optional)") |
| workerRegion = flag.String("worker_region", "", "Dataflow worker region (optional)") |
| workerZone = flag.String("worker_zone", "", "Dataflow worker zone (optional)") |
| |
| executeAsync = flag.Bool("execute_async", false, "Asynchronous execution. Submit the job and return immediately.") |
| dryRun = flag.Bool("dry_run", false, "Dry run. Just print the job, but don't submit it.") |
| teardownPolicy = flag.String("teardown_policy", "", "Job teardown policy (internal only).") |
| |
| // SDK options |
| cpuProfiling = flag.String("cpu_profiling", "", "Job records CPU profiles to this GCS location (optional)") |
| sessionRecording = flag.String("session_recording", "", "Job records session transcripts") |
| ) |
| |
| func init() { |
| // Note that we also _ import harness/init to setup the remote execution hook. |
| beam.RegisterRunner("dataflow", Execute) |
| beam.RegisterRunner("DataflowRunner", Execute) |
| |
| perf.RegisterProfCaptureHook("gcs_profile_writer", gcsRecorderHook) |
| } |
| |
| var unique int32 |
| |
| // Execute runs the given pipeline on Google Cloud Dataflow. It uses the |
| // default application credentials to submit the job. |
| func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) { |
| // (1) Gather job options |
| |
| project := *gcpopts.Project |
| if project == "" { |
| return nil, errors.New("no Google Cloud project specified. Use --project=<project>") |
| } |
| region := gcpopts.GetRegion(ctx) |
| if region == "" { |
| return nil, errors.New("No Google Cloud region specified. Use --region=<region>. See https://cloud.google.com/dataflow/docs/concepts/regional-endpoints") |
| } |
| if *stagingLocation == "" { |
| return nil, errors.New("no GCS staging location specified. Use --staging_location=gs://<bucket>/<path>") |
| } |
| var jobLabels map[string]string |
| if *labels != "" { |
| if err := json.Unmarshal([]byte(*labels), &jobLabels); err != nil { |
| return nil, errors.Wrapf(err, "error reading --label flag as JSON") |
| } |
| } |
| |
| if *cpuProfiling != "" { |
| perf.EnableProfCaptureHook("gcs_profile_writer", *cpuProfiling) |
| } |
| |
| if *sessionRecording != "" { |
| // TODO(wcn): BEAM-4017 |
| // It's a bit inconvenient for GCS because the whole object is written in |
| // one pass, whereas the session logs are constantly appended. We wouldn't |
| // want to hold all the logs in memory to flush at the end of the pipeline |
| // as we'd blow out memory on the worker. The implementation of the |
| // CaptureHook should create an internal buffer and write chunks out to GCS |
| // once they get to an appropriate size (50M or so?) |
| } |
| if *autoscalingAlgorithm != "" { |
| if *autoscalingAlgorithm != "NONE" && *autoscalingAlgorithm != "THROUGHPUT_BASED" { |
| return nil, errors.New("invalid autoscaling algorithm. Use --autoscaling_algorithm=(NONE|THROUGHPUT_BASED)") |
| } |
| } |
| |
| hooks.SerializeHooksToOptions() |
| |
| experiments := jobopts.GetExperiments() |
| // Always use runner v2, unless set already. |
| var v2set bool |
| for _, e := range experiments { |
| if strings.Contains(e, "use_runner_v2") || strings.Contains(e, "use_unified_worker") { |
| v2set = true |
| break |
| } |
| } |
| if !v2set { |
| experiments = append(experiments, "use_unified_worker") |
| } |
| // TODO(BEAM-11779) remove shuffle_mode=appliance with runner v2 once issue is resolved. |
| experiments = append(experiments, "shuffle_mode=appliance") |
| |
| if *minCPUPlatform != "" { |
| experiments = append(experiments, fmt.Sprintf("min_cpu_platform=%v", *minCPUPlatform)) |
| } |
| |
| opts := &dataflowlib.JobOptions{ |
| Name: jobopts.GetJobName(), |
| Experiments: experiments, |
| Options: beam.PipelineOptions.Export(), |
| Project: project, |
| Region: region, |
| Zone: *zone, |
| Network: *network, |
| Subnetwork: *subnetwork, |
| NoUsePublicIPs: *noUsePublicIPs, |
| NumWorkers: *numWorkers, |
| MaxNumWorkers: *maxNumWorkers, |
| DiskSizeGb: *diskSizeGb, |
| Algorithm: *autoscalingAlgorithm, |
| MachineType: *machineType, |
| Labels: jobLabels, |
| ServiceAccountEmail: *serviceAccountEmail, |
| TempLocation: *tempLocation, |
| Worker: *jobopts.WorkerBinary, |
| WorkerJar: *workerJar, |
| WorkerRegion: *workerRegion, |
| WorkerZone: *workerZone, |
| TeardownPolicy: *teardownPolicy, |
| ContainerImage: getContainerImage(ctx), |
| } |
| if opts.TempLocation == "" { |
| opts.TempLocation = gcsx.Join(*stagingLocation, "tmp") |
| } |
| |
| // (1) Build and submit |
| // NOTE(herohde) 10/8/2018: the last segment of the names must be "worker" and "dataflow-worker.jar". |
| id := fmt.Sprintf("go-%v-%v", atomic.AddInt32(&unique, 1), time.Now().UnixNano()) |
| |
| modelURL := gcsx.Join(*stagingLocation, id, "model") |
| workerURL := gcsx.Join(*stagingLocation, id, "worker") |
| jarURL := gcsx.Join(*stagingLocation, id, "dataflow-worker.jar") |
| xlangURL := gcsx.Join(*stagingLocation, id, "xlang") |
| |
| edges, _, err := p.Build() |
| if err != nil { |
| return nil, err |
| } |
| artifactURLs, err := dataflowlib.ResolveXLangArtifacts(ctx, edges, opts.Project, xlangURL) |
| if err != nil { |
| return nil, errors.WithContext(err, "resolving cross-language artifacts") |
| } |
| opts.ArtifactURLs = artifactURLs |
| environment, err := graphx.CreateEnvironment(ctx, jobopts.GetEnvironmentUrn(ctx), getContainerImage) |
| if err != nil { |
| return nil, errors.WithContext(err, "creating environment for model pipeline") |
| } |
| model, err := graphx.Marshal(edges, &graphx.Options{Environment: environment}) |
| if err != nil { |
| return nil, errors.WithContext(err, "generating model pipeline") |
| } |
| err = pipelinex.ApplySdkImageOverrides(model, jobopts.GetSdkImageOverrides()) |
| if err != nil { |
| return nil, errors.WithContext(err, "applying container image overrides") |
| } |
| |
| if *dryRun { |
| log.Info(ctx, "Dry-run: not submitting job!") |
| |
| log.Info(ctx, proto.MarshalTextString(model)) |
| job, err := dataflowlib.Translate(ctx, model, opts, workerURL, jarURL, modelURL) |
| if err != nil { |
| return nil, err |
| } |
| dataflowlib.PrintJob(ctx, job) |
| return nil, nil |
| } |
| |
| return dataflowlib.Execute(ctx, model, opts, workerURL, jarURL, modelURL, *endpoint, *executeAsync) |
| } |
| func gcsRecorderHook(opts []string) perf.CaptureHook { |
| bucket, prefix, err := gcsx.ParseObject(opts[0]) |
| if err != nil { |
| panic(fmt.Sprintf("Invalid hook configuration for gcsRecorderHook: %s", opts)) |
| } |
| |
| return func(ctx context.Context, spec string, r io.Reader) error { |
| client, err := gcsx.NewClient(ctx, storage.ScopeReadWrite) |
| if err != nil { |
| return errors.WithContext(err, "establishing GCS client") |
| } |
| return gcsx.WriteObject(ctx, client, bucket, path.Join(prefix, spec), r) |
| } |
| } |
| |
| func getContainerImage(ctx context.Context) string { |
| urn := jobopts.GetEnvironmentUrn(ctx) |
| if urn == "" || urn == "beam:env:docker:v1" { |
| if *image != "" { |
| return *image |
| } |
| return jobopts.GetEnvironmentConfig(ctx) |
| } |
| panic(fmt.Sprintf("Unsupported environment %v", urn)) |
| } |