// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package direct contains the direct runner for running single-bundle
// pipelines in the current process. Useful for testing.
package direct

import (
	"context"
	"path"

	"github.com/apache/beam/sdks/go/pkg/beam"
	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
	"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
	"github.com/apache/beam/sdks/go/pkg/beam/log"
	"github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
	"github.com/apache/beam/sdks/go/pkg/beam/runners/vet"
)

func init() {
	beam.RegisterRunner("direct", Execute)
}

// Execute runs the pipeline in-process.
func Execute(ctx context.Context, p *beam.Pipeline) error {
	log.Info(ctx, "Executing pipeline with the direct runner.")

	if !beam.Initialized() {
		log.Warn(ctx, "Beam has not been initialized. Call beam.Init() before pipeline construction.")
	}

	log.Info(ctx, "Pipeline:")
	log.Info(ctx, p)

	if *jobopts.Strict {
		log.Info(ctx, "Strict mode enabled, applying additional validation.")
		if err := vet.Execute(ctx, p); err != nil {
			return errors.Wrap(err, "strictness check failed")
		}
		log.Info(ctx, "Strict mode validation passed.")
	}

	edges, _, err := p.Build()
	if err != nil {
		return errors.Wrap(err, "invalid pipeline")
	}
	plan, err := Compile(edges)
	if err != nil {
		return errors.Wrap(err, "translation failed")
	}
	log.Info(ctx, plan)

	if err = plan.Execute(ctx, "", exec.DataContext{}); err != nil {
		plan.Down(ctx) // ignore any teardown errors
		return err
	}
	if err = plan.Down(ctx); err != nil {
		return err
	}
	metrics.DumpToLog(ctx)
	return nil
}

// Compile translates a pipeline to a multi-bundle execution plan.
func Compile(edges []*graph.MultiEdge) (*exec.Plan, error) {
	// (1) Preprocess graph structure to allow insertion of Multiplex,
	// Flatten and Discard.

	prev := make(map[int]int)      // nodeID -> #incoming
	succ := make(map[int][]linkID) // nodeID -> []linkID
	edgeMap := make(map[int]*graph.MultiEdge)

	for _, edge := range edges {
		edgeMap[edge.ID()] = edge
		for i, in := range edge.Input {
			from := in.From.ID()
			succ[from] = append(succ[from], linkID{edge.ID(), i})
		}
		for _, out := range edge.Output {
			to := out.To.ID()
			prev[to]++
		}
	}

	// (2) Constructs the plan units recursively.

	b := &builder{
		prev:  prev,
		succ:  succ,
		edges: edgeMap,
		nodes: make(map[int]exec.Node),
		links: make(map[linkID]exec.Node),
		idgen: &exec.GenID{},
	}

	var roots []exec.Unit

	for _, edge := range edges {
		switch edge.Op {
		case graph.Impulse:
			out, err := b.makeNode(edge.Output[0].To.ID())
			if err != nil {
				return nil, err
			}

			u := &Impulse{UID: b.idgen.New(), Value: edge.Value, Out: out}
			roots = append(roots, u)

		default:
			// skip non-roots
		}
	}

	return exec.NewPlan("plan", append(roots, b.units...))
}

// linkID represents an incoming data link to an Edge.
type linkID struct {
	to    int // graph.MultiEdge
	input int // input index. If > 0, it's a side or CoGBK input.
}

// builder is the recursive builder for non-root execution nodes.
type builder struct {
	prev  map[int]int              // nodeID -> #incoming
	succ  map[int][]linkID         // nodeID -> []linkID
	edges map[int]*graph.MultiEdge // edgeID -> Edge

	nodes map[int]exec.Node    // nodeID -> Node (cache)
	links map[linkID]exec.Node // linkID -> Node (cache)

	units []exec.Unit // result
	idgen *exec.GenID
}

func (b *builder) makeNodes(out []*graph.Outbound) ([]exec.Node, error) {
	var ret []exec.Node
	for _, o := range out {
		n, err := b.makeNode(o.To.ID())
		if err != nil {
			return nil, err
		}
		ret = append(ret, n)
	}
	return ret, nil
}

func (b *builder) makeNode(id int) (exec.Node, error) {
	if n, ok := b.nodes[id]; ok {
		return n, nil
	}

	list := b.succ[id]

	var u exec.Node
	switch len(list) {
	case 0:
		// Discard.

		u = &exec.Discard{UID: b.idgen.New()}

	case 1:
		return b.makeLink(list[0])

	default:
		// Multiplex.

		out, err := b.makeLinks(list)
		if err != nil {
			return nil, err
		}
		u = &exec.Multiplex{UID: b.idgen.New(), Out: out}
	}

	if count := b.prev[id]; count > 1 {
		// Guard node with Flatten, if needed.

		b.units = append(b.units, u)
		u = &exec.Flatten{UID: b.idgen.New(), N: count, Out: u}
	}

	b.nodes[id] = u
	b.units = append(b.units, u)
	return u, nil
}

func (b *builder) makeLinks(ids []linkID) ([]exec.Node, error) {
	var ret []exec.Node
	for _, id := range ids {
		n, err := b.makeLink(id)
		if err != nil {
			return nil, err
		}
		ret = append(ret, n)
	}
	return ret, nil
}

func (b *builder) makeLink(id linkID) (exec.Node, error) {
	if n, ok := b.links[id]; ok {
		return n, nil
	}

	// Process all incoming links for the edge and cache them. It thus doesn't matter
	// which exact link triggers the Node generation. The link caching is only needed
	// to process ParDo side inputs and CoGBK.

	edge := b.edges[id.to]

	out, err := b.makeNodes(edge.Output)
	if err != nil {
		return nil, err
	}

	var u exec.Node
	switch edge.Op {
	case graph.ParDo:
		pardo := &exec.ParDo{
			UID:     b.idgen.New(),
			Fn:      edge.DoFn,
			Inbound: edge.Input,
			Out:     out,
			PID:     path.Base(edge.DoFn.Name()),
		}
		if len(edge.Input) == 1 {
			u = pardo
			break
		}

		// ParDo w/ side input. We need to insert buffering and wait. We also need to
		// ensure that we return the correct link node.

		b.units = append(b.units, pardo)

		w := &wait{UID: b.idgen.New(), need: len(edge.Input) - 1, next: pardo}
		b.units = append(b.units, w)
		b.links[linkID{edge.ID(), 0}] = w

		for i := 1; i < len(edge.Input); i++ {
			n := &buffer{uid: b.idgen.New(), next: w.ID(), read: pardo.ID(), notify: w.notify}
			pardo.Side = append(pardo.Side, n)

			b.units = append(b.units, n)
			b.links[linkID{edge.ID(), i}] = n
		}

		return b.links[id], nil

	case graph.Combine:
		usesKey := typex.IsKV(edge.Input[0].Type)

		u = &exec.Combine{
			UID:     b.idgen.New(),
			Fn:      edge.CombineFn,
			UsesKey: usesKey,
			Out:     out[0],
			PID:     path.Base(edge.CombineFn.Name()),
		}

	case graph.CoGBK:
		u = &CoGBK{UID: b.idgen.New(), Edge: edge, Out: out[0]}
		b.units = append(b.units, u)

		// CoGBK needs injection of each incoming index. If > 1 incoming,
		// insert Flatten as well.

		if len(edge.Input) > 1 {
			u = &exec.Flatten{UID: b.idgen.New(), N: len(edge.Input), Out: u}
			b.units = append(b.units, u)
		}

		for i := 0; i < len(edge.Input); i++ {
			n := &Inject{UID: b.idgen.New(), N: i, Out: u}

			b.units = append(b.units, n)
			b.links[linkID{edge.ID(), i}] = n
		}

		return b.links[id], nil

	case graph.Flatten:
		u = &exec.Flatten{UID: b.idgen.New(), N: len(edge.Input), Out: out[0]}

		for i := 0; i < len(edge.Input); i++ {
			b.links[linkID{edge.ID(), i}] = u
		}

	case graph.WindowInto:
		u = &exec.WindowInto{UID: b.idgen.New(), Fn: edge.WindowFn, Out: out[0]}

	default:
		return nil, errors.Errorf("unexpected edge: %v", edge)
	}

	b.links[id] = u
	b.units = append(b.units, u)
	return u, nil
}
