| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // Package exec contains runtime plan representation and execution. A pipeline |
| // must be translated to a runtime plan to be executed. |
| package exec |
| |
| import ( |
| "context" |
| "fmt" |
| "strings" |
| "sync" |
| |
| "github.com/apache/beam/sdks/go/pkg/beam/core/metrics" |
| "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" |
| ) |
| |
| // Plan represents the bundle execution plan. It will generally be constructed |
| // from a part of a pipeline. A plan can be used to process multiple bundles |
| // serially. |
| type Plan struct { |
| id string |
| roots []Root |
| units []Unit |
| parDoIDs []string |
| |
| status Status |
| |
| // While the store is threadsafe, the reference to it |
| // is not, so we need to protect the store field to be |
| // able to asynchronously provide tentative metrics. |
| storeMu sync.Mutex |
| store *metrics.Store |
| |
| // TODO: there can be more than 1 DataSource in a bundle. |
| source *DataSource |
| } |
| |
| // hasPID provides a common interface for extracting PTransformIDs |
| // from Units. |
| type hasPID interface { |
| GetPID() string |
| } |
| |
| // NewPlan returns a new bundle execution plan from the given units. |
| func NewPlan(id string, units []Unit) (*Plan, error) { |
| var roots []Root |
| var source *DataSource |
| var pardoIDs []string |
| |
| for _, u := range units { |
| if u == nil { |
| return nil, errors.Errorf("no <nil> units") |
| } |
| if r, ok := u.(Root); ok { |
| roots = append(roots, r) |
| } |
| if s, ok := u.(*DataSource); ok { |
| source = s |
| } |
| if p, ok := u.(hasPID); ok { |
| pardoIDs = append(pardoIDs, p.GetPID()) |
| } |
| } |
| if len(roots) == 0 { |
| return nil, errors.Errorf("no root units") |
| } |
| |
| return &Plan{ |
| id: id, |
| status: Initializing, |
| roots: roots, |
| units: units, |
| parDoIDs: pardoIDs, |
| source: source, |
| }, nil |
| } |
| |
| // ID returns the plan identifier. |
| func (p *Plan) ID() string { |
| return p.id |
| } |
| |
| // SourcePTransformID returns the ID of the data's origin PTransform. |
| func (p *Plan) SourcePTransformID() string { |
| return p.source.SID.PtransformID |
| } |
| |
| // Execute executes the plan with the given data context and bundle id. Units |
| // are brought up on the first execution. If a bundle fails, the plan cannot |
| // be reused for further bundles. Does not panic. Blocking. |
| func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) error { |
| ctx = metrics.SetBundleID(ctx, p.id) |
| p.storeMu.Lock() |
| p.store = metrics.GetStore(ctx) |
| p.storeMu.Unlock() |
| if p.status == Initializing { |
| for _, u := range p.units { |
| if err := callNoPanic(ctx, u.Up); err != nil { |
| p.status = Broken |
| return errors.Wrapf(err, "while executing Up for %v", p) |
| } |
| } |
| p.status = Up |
| } |
| if p.source != nil { |
| p.source.InitSplittable() |
| } |
| |
| if p.status != Up { |
| return errors.Errorf("invalid status for plan %v: %v", p.id, p.status) |
| } |
| |
| // Process bundle. If there are any kinds of failures, we bail and mark the plan broken. |
| |
| p.status = Active |
| for _, root := range p.roots { |
| if err := callNoPanic(ctx, func(ctx context.Context) error { return root.StartBundle(ctx, id, manager) }); err != nil { |
| p.status = Broken |
| return errors.Wrapf(err, "while executing StartBundle for %v", p) |
| } |
| } |
| for _, root := range p.roots { |
| if err := callNoPanic(ctx, root.Process); err != nil { |
| p.status = Broken |
| return errors.Wrapf(err, "while executing Process for %v", p) |
| } |
| } |
| for _, root := range p.roots { |
| if err := callNoPanic(ctx, root.FinishBundle); err != nil { |
| p.status = Broken |
| return errors.Wrapf(err, "while executing FinishBundle for %v", p) |
| } |
| } |
| |
| p.status = Up |
| return nil |
| } |
| |
| // Down takes the plan and associated units down. Does not panic. |
| func (p *Plan) Down(ctx context.Context) error { |
| if p.status == Down { |
| return nil // ok: already down |
| } |
| p.status = Down |
| |
| var errs []error |
| for _, u := range p.units { |
| if err := callNoPanic(ctx, u.Down); err != nil { |
| errs = append(errs, err) |
| } |
| } |
| |
| switch len(errs) { |
| case 0: |
| return nil |
| case 1: |
| return errors.Wrapf(errs[0], "plan %v failed", p.id) |
| default: |
| return errors.Errorf("plan %v failed with multiple errors: %v", p.id, errs) |
| } |
| } |
| |
| func (p *Plan) String() string { |
| var units []string |
| for _, u := range p.units { |
| units = append(units, fmt.Sprintf("%v: %v", u.ID(), u)) |
| } |
| return fmt.Sprintf("Plan[%v]:\n%v", p.ID(), strings.Join(units, "\n")) |
| } |
| |
| // Progress returns a snapshot of input progress of the plan, and associated metrics. |
| func (p *Plan) Progress() (ProgressReportSnapshot, bool) { |
| if p.source != nil { |
| return p.source.Progress(), true |
| } |
| return ProgressReportSnapshot{}, false |
| } |
| |
| // Store returns the metric store for the last use of this plan. |
| func (p *Plan) Store() *metrics.Store { |
| p.storeMu.Lock() |
| defer p.storeMu.Unlock() |
| return p.store |
| } |
| |
| // SplitPoints captures the split requested by the Runner. |
| type SplitPoints struct { |
| // Splits is a list of desired split indices. |
| Splits []int64 |
| Frac float64 |
| |
| // Estimated total number of elements (including unsent) for the source. |
| // A zero value indicates unknown, instead use locally known size. |
| BufSize int64 |
| } |
| |
| // Split takes a set of potential split indexes, and if successful returns |
| // the split index of the first element of the residual, on which processing |
| // will be halted. |
| // Returns an error when unable to split. |
| func (p *Plan) Split(s SplitPoints) (int64, error) { |
| if p.source != nil { |
| return p.source.Split(s.Splits, s.Frac, s.BufSize) |
| } |
| return 0, fmt.Errorf("failed to split at requested splits: {%v}, Source not initialized", s) |
| } |