blob: dcc7922098bf7d33c12309141db0f5a717744cde [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Package harness implements the SDK side of the Beam FnAPI.
package harness
import (
fnpb ""
// TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a plugin).
// Main is the main entrypoint for the Go harness. It runs at "runtime" -- not
// "pipeline-construction time" -- on each worker. It is a FnAPI client and
// ultimately responsible for correctly executing user code.
func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
setupRemoteLogging(ctx, loggingEndpoint)
// Connect to FnAPI control server. Receive and execute work.
// TODO: setup data manager, DoFn register
conn, err := dial(ctx, controlEndpoint, 60*time.Second)
if err != nil {
return errors.Wrap(err, "failed to connect")
defer conn.Close()
client, err := fnpb.NewBeamFnControlClient(conn).Control(ctx)
if err != nil {
return errors.Wrapf(err, "failed to connect to control service")
log.Debugf(ctx, "Successfully connected to control @ %v", controlEndpoint)
// Each ProcessBundle is a sub-graph of the original one.
var wg sync.WaitGroup
respc := make(chan *fnpb.InstructionResponse, 100)
// gRPC requires all writers to a stream be the same goroutine, so this is the
// goroutine for managing responses back to the control service.
go func() {
defer wg.Done()
for resp := range respc {
log.Debugf(ctx, "RESP: %v", proto.MarshalTextString(resp))
if err := client.Send(resp); err != nil {
log.Errorf(ctx, "control.Send: Failed to respond: %v", err)
ctrl := &control{
plans: make(map[string]*exec.Plan),
active: make(map[string]*exec.Plan),
data: &DataChannelManager{},
state: &StateChannelManager{},
// gRPC requires all readers of a stream be the same goroutine, so this goroutine
// is responsible for managing the network data. All it does is pull data from
// the stream, and hand off the message to a goroutine to actually be handled,
// so as to avoid blocking the underlying network channel.
for {
req, err := client.Recv()
if err != nil {
if err == io.EOF {
return nil
return errors.Wrapf(err, "control.Recv failed")
// Launch a goroutine to handle the control message.
// TODO(wcn): implement a rate limiter for 'heavy' messages?
fn := func(ctx context.Context, req *fnpb.InstructionRequest) {
log.Debugf(ctx, "RECV: %v", proto.MarshalTextString(req))
ctx = hooks.RunRequestHooks(ctx, req)
resp := ctrl.handleInstruction(ctx, req)
hooks.RunResponseHooks(ctx, req, resp)
if resp != nil {
respc <- resp
if req.GetProcessBundle() != nil {
// Only process bundles in a goroutine. We at least need to process instructions for
// each plan serially. Perhaps just invoke plan.Execute async?
go fn(ctx, req)
} else {
fn(ctx, req)
type control struct {
// plans that are candidates for execution.
plans map[string]*exec.Plan // protected by mu
// plans that are actively being executed.
// a plan can only be in one of these maps at any time.
active map[string]*exec.Plan // protected by mu
mu sync.Mutex
data *DataChannelManager
state *StateChannelManager
func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRequest) *fnpb.InstructionResponse {
id := req.GetInstructionId()
ctx = setInstID(ctx, id)
switch {
case req.GetRegister() != nil:
msg := req.GetRegister()
for _, desc := range msg.GetProcessBundleDescriptor() {
p, err := exec.UnmarshalPlan(desc)
if err != nil {
return fail(id, "Invalid bundle desc: %v", err)
pid := desc.GetId()
log.Debugf(ctx, "Plan %v: %v", pid, p)
c.plans[pid] = p
return &fnpb.InstructionResponse{
InstructionId: id,
Response: &fnpb.InstructionResponse_Register{
Register: &fnpb.RegisterResponse{},
case req.GetProcessBundle() != nil:
msg := req.GetProcessBundle()
// NOTE: the harness sends a 0-length process bundle request to sources (changed?)
log.Debugf(ctx, "PB: %v", msg)
ref := msg.GetProcessBundleDescriptorId()
plan, ok := c.plans[ref]
// Make the plan active, and remove it from candidates
// since a plan can't be run concurrently.[id] = plan
delete(c.plans, ref)
if !ok {
return fail(id, "execution plan for %v not found", ref)
data := NewScopedDataManager(, id)
state := NewScopedStateReader(c.state, id)
err := plan.Execute(ctx, id, exec.DataContext{Data: data, State: state})
m := plan.Metrics()
// Move the plan back to the candidate state
c.plans[plan.ID()] = plan
delete(, id)
if err != nil {
return fail(id, "execute failed: %v", err)
return &fnpb.InstructionResponse{
InstructionId: id,
Response: &fnpb.InstructionResponse_ProcessBundle{
ProcessBundle: &fnpb.ProcessBundleResponse{
Metrics: m,
case req.GetProcessBundleProgress() != nil:
msg := req.GetProcessBundleProgress()
// log.Debugf(ctx, "PB Progress: %v", msg)
ref := msg.GetInstructionId()
plan, ok :=[ref]
if !ok {
return fail(id, "execution plan for %v not found", ref)
m := plan.Metrics()
return &fnpb.InstructionResponse{
InstructionId: id,
Response: &fnpb.InstructionResponse_ProcessBundleProgress{
ProcessBundleProgress: &fnpb.ProcessBundleProgressResponse{
Metrics: m,
case req.GetProcessBundleSplit() != nil:
msg := req.GetProcessBundleSplit()
log.Debugf(ctx, "PB Split: %v", msg)
ref := msg.GetInstructionId()
plan, ok :=[ref]
if !ok {
return fail(id, "execution plan for %v not found", ref)
// Get the desired splits for the root FnAPI read operation.
ds := msg.GetDesiredSplits()[plan.SourcePTransformID()]
if ds == nil {
return fail(id, "failed to split: desired splits for root was empty.")
split, err := plan.Split(exec.SplitPoints{ds.GetAllowedSplitPoints(), ds.GetFractionOfRemainder()})
if err != nil {
return fail(id, "unable to split: %v", err)
return &fnpb.InstructionResponse{
InstructionId: id,
Response: &fnpb.InstructionResponse_ProcessBundleSplit{
ProcessBundleSplit: &fnpb.ProcessBundleSplitResponse{
ChannelSplits: []*fnpb.ProcessBundleSplitResponse_ChannelSplit{
LastPrimaryElement: int32(split - 1),
FirstResidualElement: int32(split),
return fail(id, "Unexpected request: %v", req)
func fail(id, format string, args ...interface{}) *fnpb.InstructionResponse {
dummy := &fnpb.InstructionResponse_Register{Register: &fnpb.RegisterResponse{}}
return &fnpb.InstructionResponse{
InstructionId: id,
Error: fmt.Sprintf(format, args...),
Response: dummy,
// dial to the specified endpoint. if timeout <=0, call blocks until
// grpc.Dial succeeds.
func dial(ctx context.Context, endpoint string, timeout time.Duration) (*grpc.ClientConn, error) {
log.Infof(ctx, "Connecting via grpc @ %s ...", endpoint)
return grpcx.Dial(ctx, endpoint, timeout)