// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package graphx

import (
	"fmt"

	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
	v1 "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
	pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
	"github.com/golang/protobuf/proto"
	"github.com/golang/protobuf/ptypes"
)

// Model constants for interfacing with a Beam runner.
// TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the pipeline_v1 proto
const (
	URNImpulse       = "beam:transform:impulse:v1"
	URNParDo         = "urn:beam:transform:pardo:v1"
	URNFlatten       = "beam:transform:flatten:v1"
	URNGBK           = "beam:transform:group_by_key:v1"
	URNCombinePerKey = "beam:transform:combine_per_key:v1"
	URNWindow        = "beam:transform:window:v1"

	// URNIterableSideInput = "beam:side_input:iterable:v1"
	URNMultimapSideInput = "beam:side_input:multimap:v1"

	URNGlobalWindowsWindowFn  = "beam:windowfn:global_windows:v0.1"
	URNFixedWindowsWindowFn   = "beam:windowfn:fixed_windows:v0.1"
	URNSlidingWindowsWindowFn = "beam:windowfn:sliding_windows:v0.1"
	URNSessionsWindowFn       = "beam:windowfn:session_windows:v0.1"

	// SDK constants

	// URNJavaDoFn is the legacy constant for marking a DoFn.
	// TODO: remove URNJavaDoFN when the Dataflow runner
	// uses the model pipeline and no longer falls back to Java.
	URNJavaDoFn = "urn:beam:dofn:javasdk:0.1"
	URNDoFn     = "beam:go:transform:dofn:v1"

	URNIterableSideInputKey = "beam:go:transform:iterablesideinputkey:v1"
)

// TODO(herohde) 11/6/2017: move some of the configuration into the graph during construction.

// Options for marshalling a graph into a model pipeline.
type Options struct {
	// Environment used to run the user code.
	Environment pb.Environment
}

// Marshal converts a graph to a model pipeline.
func Marshal(edges []*graph.MultiEdge, opt *Options) (*pb.Pipeline, error) {
	tree := NewScopeTree(edges)

	m := newMarshaller(opt)
	for _, edge := range tree.Edges {
		m.addMultiEdge(edge)
	}
	for _, t := range tree.Children {
		m.addScopeTree(t)
	}

	p := &pb.Pipeline{
		Components: m.build(),
	}
	return pipelinex.Normalize(p)
}

type marshaller struct {
	opt *Options

	transforms   map[string]*pb.PTransform
	pcollections map[string]*pb.PCollection
	windowing    map[string]*pb.WindowingStrategy
	environments map[string]*pb.Environment

	coders *CoderMarshaller

	windowing2id map[string]string
}

func newMarshaller(opt *Options) *marshaller {
	return &marshaller{
		opt:          opt,
		transforms:   make(map[string]*pb.PTransform),
		pcollections: make(map[string]*pb.PCollection),
		windowing:    make(map[string]*pb.WindowingStrategy),
		environments: make(map[string]*pb.Environment),
		coders:       NewCoderMarshaller(),
		windowing2id: make(map[string]string),
	}
}

func (m *marshaller) build() *pb.Components {
	return &pb.Components{
		Transforms:          m.transforms,
		Pcollections:        m.pcollections,
		WindowingStrategies: m.windowing,
		Environments:        m.environments,
		Coders:              m.coders.Build(),
	}
}

func (m *marshaller) addScopeTree(s *ScopeTree) string {
	id := scopeID(s.Scope.Scope)
	if _, exists := m.transforms[id]; exists {
		return id
	}

	var subtransforms []string
	for _, edge := range s.Edges {
		subtransforms = append(subtransforms, m.addMultiEdge(edge))
	}
	for _, tree := range s.Children {
		subtransforms = append(subtransforms, m.addScopeTree(tree))
	}

	transform := &pb.PTransform{
		UniqueName:    s.Scope.Name,
		Subtransforms: subtransforms,
	}

	m.updateIfCombineComposite(s, transform)

	m.transforms[id] = transform
	return id
}

// updateIfCombineComposite examines the scope tree and sets the PTransform Spec
// to be a CombinePerKey with a CombinePayload if it's a liftable composite.
// Beam Portability requires that composites contain an implementation for runners
// that don't understand the URN and Payload, which this lightly checks for.
func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pb.PTransform) {
	if s.Scope.Name != graph.CombinePerKeyScope ||
		len(s.Edges) != 2 ||
		len(s.Edges[0].Edge.Input) != 1 ||
		len(s.Edges[1].Edge.Output) != 1 ||
		s.Edges[1].Edge.Op != graph.Combine {
		return
	}

	edge := s.Edges[1].Edge
	acID := m.coders.Add(edge.AccumCoder)
	payload := &pb.CombinePayload{
		CombineFn: &pb.SdkFunctionSpec{
			Spec: &pb.FunctionSpec{
				Urn:     URNJavaDoFn,
				Payload: []byte(mustEncodeMultiEdgeBase64(edge)),
			},
			EnvironmentId: m.addDefaultEnv(),
		},
		AccumulatorCoderId: acID,
	}
	transform.Spec = &pb.FunctionSpec{Urn: URNCombinePerKey, Payload: protox.MustEncode(payload)}
}

func (m *marshaller) addMultiEdge(edge NamedEdge) string {
	id := edgeID(edge.Edge)
	if _, exists := m.transforms[id]; exists {
		return id
	}

	if edge.Edge.Op == graph.CoGBK && len(edge.Edge.Input) > 1 {
		return m.expandCoGBK(edge)
	}

	inputs := make(map[string]string)
	for i, in := range edge.Edge.Input {
		m.addNode(in.From)
		inputs[fmt.Sprintf("i%v", i)] = nodeID(in.From)
	}
	outputs := make(map[string]string)
	for i, out := range edge.Edge.Output {
		m.addNode(out.To)
		outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
	}

	var spec *pb.FunctionSpec
	switch edge.Edge.Op {
	case graph.Impulse:
		// TODO(herohde) 7/18/2018: Encode data?
		spec = &pb.FunctionSpec{Urn: URNImpulse}

	case graph.ParDo:
		si := make(map[string]*pb.SideInput)
		for i, in := range edge.Edge.Input {
			switch in.Kind {
			case graph.Main:
				// ignore: not a side input

			case graph.Singleton, graph.Slice, graph.Iter, graph.ReIter:
				// The only supported form of side input is MultiMap, but we
				// want just iteration. So we must manually add a fixed key,
				// "", even if the input is already KV.

				out := fmt.Sprintf("%v_keyed%v_%v", nodeID(in.From), edgeID(edge.Edge), i)
				m.makeNode(out, m.coders.Add(makeBytesKeyedCoder(in.From.Coder)), in.From)

				payload := &pb.ParDoPayload{
					DoFn: &pb.SdkFunctionSpec{
						Spec: &pb.FunctionSpec{
							Urn: URNIterableSideInputKey,
							Payload: []byte(protox.MustEncodeBase64(&v1.TransformPayload{
								Urn: URNIterableSideInputKey,
							})),
						},
						EnvironmentId: m.addDefaultEnv(),
					},
				}

				keyedID := fmt.Sprintf("%v_keyed%v", edgeID(edge.Edge), i)
				keyed := &pb.PTransform{
					UniqueName: keyedID,
					Spec: &pb.FunctionSpec{
						Urn:     URNParDo,
						Payload: protox.MustEncode(payload),
					},
					Inputs:  map[string]string{"i0": nodeID(in.From)},
					Outputs: map[string]string{"i0": out},
				}
				m.transforms[keyedID] = keyed

				// Fixup input map
				inputs[fmt.Sprintf("i%v", i)] = out

				si[fmt.Sprintf("i%v", i)] = &pb.SideInput{
					AccessPattern: &pb.FunctionSpec{
						Urn: URNMultimapSideInput,
					},
					ViewFn: &pb.SdkFunctionSpec{
						Spec: &pb.FunctionSpec{
							Urn: "foo",
						},
						EnvironmentId: m.addDefaultEnv(),
					},
					WindowMappingFn: &pb.SdkFunctionSpec{
						Spec: &pb.FunctionSpec{
							Urn: "bar",
						},
						EnvironmentId: m.addDefaultEnv(),
					},
				}

			case graph.Map, graph.MultiMap:
				panic("NYI")

			default:
				panic(fmt.Sprintf("unexpected input kind: %v", edge))
			}
		}

		payload := &pb.ParDoPayload{
			DoFn: &pb.SdkFunctionSpec{
				Spec: &pb.FunctionSpec{
					Urn:     URNJavaDoFn,
					Payload: []byte(mustEncodeMultiEdgeBase64(edge.Edge)),
				},
				EnvironmentId: m.addDefaultEnv(),
			},
			SideInputs: si,
		}
		spec = &pb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}

	case graph.Combine:
		payload := &pb.ParDoPayload{
			DoFn: &pb.SdkFunctionSpec{
				Spec: &pb.FunctionSpec{
					Urn:     URNJavaDoFn,
					Payload: []byte(mustEncodeMultiEdgeBase64(edge.Edge)),
				},
				EnvironmentId: m.addDefaultEnv(),
			},
		}
		spec = &pb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}

	case graph.Flatten:
		spec = &pb.FunctionSpec{Urn: URNFlatten}

	case graph.CoGBK:
		spec = &pb.FunctionSpec{Urn: URNGBK}

	case graph.WindowInto:
		payload := &pb.WindowIntoPayload{
			WindowFn: &pb.SdkFunctionSpec{
				Spec: makeWindowFn(edge.Edge.WindowFn),
			},
		}
		spec = &pb.FunctionSpec{Urn: URNWindow, Payload: protox.MustEncode(payload)}

	case graph.External:
		spec = &pb.FunctionSpec{Urn: edge.Edge.Payload.URN, Payload: edge.Edge.Payload.Data}

	default:
		panic(fmt.Sprintf("Unexpected opcode: %v", edge.Edge.Op))
	}

	transform := &pb.PTransform{
		UniqueName: edge.Name,
		Spec:       spec,
		Inputs:     inputs,
		Outputs:    outputs,
	}
	m.transforms[id] = transform
	return id
}

func (m *marshaller) expandCoGBK(edge NamedEdge) string {
	// TODO(BEAM-490): replace once CoGBK is a primitive. For now, we have to translate
	// CoGBK with multiple PCollections as described in cogbk.go.

	id := edgeID(edge.Edge)
	kvCoderID := m.coders.Add(MakeKVUnionCoder(edge.Edge))
	gbkCoderID := m.coders.Add(MakeGBKUnionCoder(edge.Edge))

	var subtransforms []string

	inputs := make(map[string]string)
	for i, in := range edge.Edge.Input {
		m.addNode(in.From)

		out := fmt.Sprintf("%v_%v_inject%v", nodeID(in.From), id, i)
		m.makeNode(out, kvCoderID, in.From)

		// Inject(i)

		injectID := fmt.Sprintf("%v_inject%v", id, i)
		payload := &pb.ParDoPayload{
			DoFn: &pb.SdkFunctionSpec{
				Spec: &pb.FunctionSpec{
					Urn: URNInject,
					Payload: []byte(protox.MustEncodeBase64(&v1.TransformPayload{
						Urn:    URNInject,
						Inject: &v1.InjectPayload{N: (int32)(i)},
					})),
				},
				EnvironmentId: m.addDefaultEnv(),
			},
		}
		inject := &pb.PTransform{
			UniqueName: injectID,
			Spec: &pb.FunctionSpec{
				Urn:     URNParDo,
				Payload: protox.MustEncode(payload),
			},
			Inputs:  map[string]string{"i0": nodeID(in.From)},
			Outputs: map[string]string{"i0": out},
		}
		m.transforms[injectID] = inject
		subtransforms = append(subtransforms, injectID)

		inputs[fmt.Sprintf("i%v", i)] = out
	}

	outNode := edge.Edge.Output[0].To

	// Flatten

	out := fmt.Sprintf("%v_flatten", nodeID(outNode))
	m.makeNode(out, kvCoderID, outNode)

	flattenID := fmt.Sprintf("%v_flatten", id)
	flatten := &pb.PTransform{
		UniqueName: flattenID,
		Spec:       &pb.FunctionSpec{Urn: URNFlatten},
		Inputs:     inputs,
		Outputs:    map[string]string{"i0": out},
	}
	m.transforms[flattenID] = flatten
	subtransforms = append(subtransforms, flattenID)

	// CoGBK

	gbkOut := fmt.Sprintf("%v_out", nodeID(outNode))
	m.makeNode(gbkOut, gbkCoderID, outNode)

	gbkID := fmt.Sprintf("%v_gbk", id)
	gbk := &pb.PTransform{
		UniqueName: gbkID,
		Spec:       &pb.FunctionSpec{Urn: URNGBK},
		Inputs:     map[string]string{"i0": out},
		Outputs:    map[string]string{"i0": gbkOut},
	}
	m.transforms[gbkID] = gbk
	subtransforms = append(subtransforms, gbkID)

	// Expand

	m.addNode(outNode)

	expandID := fmt.Sprintf("%v_expand", id)
	payload := &pb.ParDoPayload{
		DoFn: &pb.SdkFunctionSpec{
			Spec: &pb.FunctionSpec{
				Urn: URNExpand,
				Payload: []byte(protox.MustEncodeBase64(&v1.TransformPayload{
					Urn: URNExpand,
				})),
			},
			EnvironmentId: m.addDefaultEnv(),
		},
	}
	expand := &pb.PTransform{
		UniqueName: expandID,
		Spec: &pb.FunctionSpec{
			Urn:     URNParDo,
			Payload: protox.MustEncode(payload),
		},
		Inputs:  map[string]string{"i0": gbkOut},
		Outputs: map[string]string{"i0": nodeID(outNode)},
	}
	m.transforms[id] = expand
	subtransforms = append(subtransforms, id)

	// Add composite for visualization

	cogbkID := fmt.Sprintf("%v_cogbk", id)
	m.transforms[cogbkID] = &pb.PTransform{
		UniqueName:    edge.Name,
		Subtransforms: subtransforms,
	}
	return cogbkID
}

func (m *marshaller) addNode(n *graph.Node) string {
	id := nodeID(n)
	if _, exists := m.pcollections[id]; exists {
		return id
	}
	// TODO(herohde) 11/15/2017: expose UniqueName to user.
	return m.makeNode(id, m.coders.Add(n.Coder), n)
}

func (m *marshaller) makeNode(id, cid string, n *graph.Node) string {
	col := &pb.PCollection{
		UniqueName:          id,
		CoderId:             cid,
		IsBounded:           boolToBounded(n.Bounded()),
		WindowingStrategyId: m.addWindowingStrategy(n.WindowingStrategy()),
	}
	m.pcollections[id] = col
	return id
}

func boolToBounded(bounded bool) pb.IsBounded_Enum {
	if bounded {
		return pb.IsBounded_BOUNDED
	}
	return pb.IsBounded_UNBOUNDED
}

func (m *marshaller) addDefaultEnv() string {
	const id = "go"
	if _, exists := m.environments[id]; !exists {
		m.environments[id] = &m.opt.Environment
	}
	return id
}

func (m *marshaller) addWindowingStrategy(w *window.WindowingStrategy) string {
	ws := marshalWindowingStrategy(m.coders, w)
	return m.internWindowingStrategy(ws)
}

func (m *marshaller) internWindowingStrategy(w *pb.WindowingStrategy) string {
	key := proto.MarshalTextString(w)
	if id, exists := m.windowing2id[key]; exists {
		return id
	}

	id := fmt.Sprintf("w%v", len(m.windowing2id))
	m.windowing2id[key] = id
	m.windowing[id] = w
	return id
}

// marshalWindowingStrategy marshals the given windowing strategy in
// the given coder context.
func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) *pb.WindowingStrategy {
	ws := &pb.WindowingStrategy{
		WindowFn: &pb.SdkFunctionSpec{
			Spec: makeWindowFn(w.Fn),
		},
		MergeStatus:      pb.MergeStatus_NON_MERGING,
		AccumulationMode: pb.AccumulationMode_DISCARDING,
		WindowCoderId:    c.AddWindowCoder(makeWindowCoder(w.Fn)),
		Trigger: &pb.Trigger{
			Trigger: &pb.Trigger_Default_{
				Default: &pb.Trigger_Default{},
			},
		},
		OutputTime:      pb.OutputTime_END_OF_WINDOW,
		ClosingBehavior: pb.ClosingBehavior_EMIT_IF_NONEMPTY,
		AllowedLateness: 0,
		OnTimeBehavior:  pb.OnTimeBehavior_FIRE_ALWAYS,
	}
	return ws
}

func makeWindowFn(w *window.Fn) *pb.FunctionSpec {
	switch w.Kind {
	case window.GlobalWindows:
		return &pb.FunctionSpec{
			Urn: URNGlobalWindowsWindowFn,
		}
	case window.FixedWindows:
		return &pb.FunctionSpec{
			Urn: URNFixedWindowsWindowFn,
			Payload: protox.MustEncode(
				&pb.FixedWindowsPayload{
					Size: ptypes.DurationProto(w.Size),
				},
			),
		}
	case window.SlidingWindows:
		return &pb.FunctionSpec{
			Urn: URNSlidingWindowsWindowFn,
			Payload: protox.MustEncode(
				&pb.SlidingWindowsPayload{
					Size:   ptypes.DurationProto(w.Size),
					Period: ptypes.DurationProto(w.Period),
				},
			),
		}
	case window.Sessions:
		return &pb.FunctionSpec{
			Urn: URNSessionsWindowFn,
			Payload: protox.MustEncode(
				&pb.SessionsPayload{
					GapSize: ptypes.DurationProto(w.Gap),
				},
			),
		}
	default:
		panic(fmt.Sprintf("Unexpected windowing strategy: %v", w))
	}
}

func makeWindowCoder(w *window.Fn) *coder.WindowCoder {
	switch w.Kind {
	case window.GlobalWindows:
		return coder.NewGlobalWindow()
	case window.FixedWindows, window.SlidingWindows, URNSlidingWindowsWindowFn:
		return coder.NewIntervalWindow()
	default:
		panic(fmt.Sprintf("Unexpected windowing strategy: %v", w))
	}
}

func mustEncodeMultiEdgeBase64(edge *graph.MultiEdge) string {
	ref, err := EncodeMultiEdge(edge)
	if err != nil {
		panic(errors.Wrapf(err, "Failed to serialize %v", edge))
	}
	return protox.MustEncodeBase64(&v1.TransformPayload{
		Urn:  URNDoFn,
		Edge: ref,
	})
}

// makeBytesKeyedCoder returns KV<[]byte,A,> for any coder,
// even if the coder is already a KV coder.
func makeBytesKeyedCoder(c *coder.Coder) *coder.Coder {
	return coder.NewKV([]*coder.Coder{coder.NewBytes(), c})
}

func edgeID(edge *graph.MultiEdge) string {
	return fmt.Sprintf("e%v", edge.ID())
}

func nodeID(n *graph.Node) string {
	return fmt.Sprintf("n%v", n.ID())
}

func scopeID(s *graph.Scope) string {
	return fmt.Sprintf("s%v", s.ID())
}
