blob: 53891d6ca5e8fb98e7e80ed7a111781e4ea15a86 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package exec contains runtime plan representation and execution. A pipeline
// must be translated to a runtime plan to be executed.
package exec
import (
"context"
"fmt"
"strings"
"sync"
"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
// Plan represents the bundle execution plan. It will generally be constructed
// from a part of a pipeline. A plan can be used to process multiple bundles
// serially.
type Plan struct {
id string
roots []Root
units []Unit
parDoIDs []string
status Status
// While the store is threadsafe, the reference to it
// is not, so we need to protect the store field to be
// able to asynchronously provide tentative metrics.
storeMu sync.Mutex
store *metrics.Store
// TODO: there can be more than 1 DataSource in a bundle.
source *DataSource
}
// hasPID provides a common interface for extracting PTransformIDs
// from Units.
type hasPID interface {
GetPID() string
}
// NewPlan returns a new bundle execution plan from the given units.
func NewPlan(id string, units []Unit) (*Plan, error) {
var roots []Root
var source *DataSource
var pardoIDs []string
for _, u := range units {
if u == nil {
return nil, errors.Errorf("no <nil> units")
}
if r, ok := u.(Root); ok {
roots = append(roots, r)
}
if s, ok := u.(*DataSource); ok {
source = s
}
if p, ok := u.(hasPID); ok {
pardoIDs = append(pardoIDs, p.GetPID())
}
}
if len(roots) == 0 {
return nil, errors.Errorf("no root units")
}
return &Plan{
id: id,
status: Initializing,
roots: roots,
units: units,
parDoIDs: pardoIDs,
source: source,
}, nil
}
// ID returns the plan identifier.
func (p *Plan) ID() string {
return p.id
}
// SourcePTransformID returns the ID of the data's origin PTransform.
func (p *Plan) SourcePTransformID() string {
return p.source.SID.PtransformID
}
// Execute executes the plan with the given data context and bundle id. Units
// are brought up on the first execution. If a bundle fails, the plan cannot
// be reused for further bundles. Does not panic. Blocking.
func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) error {
ctx = metrics.SetBundleID(ctx, p.id)
p.storeMu.Lock()
p.store = metrics.GetStore(ctx)
p.storeMu.Unlock()
if p.status == Initializing {
for _, u := range p.units {
if err := callNoPanic(ctx, u.Up); err != nil {
p.status = Broken
return errors.Wrapf(err, "while executing Up for %v", p)
}
}
p.status = Up
}
if p.source != nil {
p.source.InitSplittable()
}
if p.status != Up {
return errors.Errorf("invalid status for plan %v: %v", p.id, p.status)
}
// Process bundle. If there are any kinds of failures, we bail and mark the plan broken.
p.status = Active
for _, root := range p.roots {
if err := callNoPanic(ctx, func(ctx context.Context) error { return root.StartBundle(ctx, id, manager) }); err != nil {
p.status = Broken
return errors.Wrapf(err, "while executing StartBundle for %v", p)
}
}
for _, root := range p.roots {
if err := callNoPanic(ctx, root.Process); err != nil {
p.status = Broken
return errors.Wrapf(err, "while executing Process for %v", p)
}
}
for _, root := range p.roots {
if err := callNoPanic(ctx, root.FinishBundle); err != nil {
p.status = Broken
return errors.Wrapf(err, "while executing FinishBundle for %v", p)
}
}
p.status = Up
return nil
}
// Down takes the plan and associated units down. Does not panic.
func (p *Plan) Down(ctx context.Context) error {
if p.status == Down {
return nil // ok: already down
}
p.status = Down
var errs []error
for _, u := range p.units {
if err := callNoPanic(ctx, u.Down); err != nil {
errs = append(errs, err)
}
}
switch len(errs) {
case 0:
return nil
case 1:
return errors.Wrapf(errs[0], "plan %v failed", p.id)
default:
return errors.Errorf("plan %v failed with multiple errors: %v", p.id, errs)
}
}
func (p *Plan) String() string {
var units []string
for _, u := range p.units {
units = append(units, fmt.Sprintf("%v: %v", u.ID(), u))
}
return fmt.Sprintf("Plan[%v]:\n%v", p.ID(), strings.Join(units, "\n"))
}
// Progress returns a snapshot of input progress of the plan, and associated metrics.
func (p *Plan) Progress() (ProgressReportSnapshot, bool) {
if p.source != nil {
return p.source.Progress(), true
}
return ProgressReportSnapshot{}, false
}
// Store returns the metric store for the last use of this plan.
func (p *Plan) Store() *metrics.Store {
p.storeMu.Lock()
defer p.storeMu.Unlock()
return p.store
}
// SplitPoints captures the split requested by the Runner.
type SplitPoints struct {
// Splits is a list of desired split indices.
Splits []int64
Frac float64
// Estimated total number of elements (including unsent) for the source.
// A zero value indicates unknown, instead use locally known size.
BufSize int64
}
// Split takes a set of potential split indexes, and if successful returns
// the split index of the first element of the residual, on which processing
// will be halted.
// Returns an error when unable to split.
func (p *Plan) Split(s SplitPoints) (int64, error) {
if p.source != nil {
return p.source.Split(s.Splits, s.Frac, s.BufSize)
}
return 0, fmt.Errorf("failed to split at requested splits: {%v}, Source not initialized", s)
}