// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package typex contains full type representation for PCollections and DoFns, and
// utilities for type checking.
package typex

import (
	"fmt"
	"reflect"
	"strings"

	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)

// FullType represents the tree structure of data types processed by the graph.
// It allows representation of composite types, such as KV<int, string> or
// W<CoGBK<int, int>>, as well as "generic" such types, KV<int,T> or CoGBK<X,Y>,
// where the free "type variables" are the fixed universal types: T, X, etc.
type FullType interface {
	// Class returns the class of the FullType. It is never Illegal.
	Class() Class
	// Type returns the Go type at the root of the tree, such as KV, X
	// or int.
	Type() reflect.Type
	// Components returns the real components of the root type, if Composite.
	Components() []FullType
}

type tree struct {
	class      Class
	t          reflect.Type
	components []FullType
}

func (t *tree) Class() Class {
	return t.class
}

func (t *tree) Type() reflect.Type {
	return t.t
}

func (t *tree) Components() []FullType {
	return t.components
}

func (t *tree) String() string {
	switch t.class {
	case Concrete:
		return t.t.String()
	case Universal:
		return t.t.Name()
	case Container:
		if IsList(t.t) {
			return fmt.Sprintf("[]%v", t.components[0])
		}
		return fmt.Sprintf("<invalid: %v>", t.t)
	case Composite:
		var args []string
		for _, c := range t.components {
			args = append(args, fmt.Sprintf("%v", c))
		}
		return fmt.Sprintf("%v<%v>", printShortComposite(t.t), strings.Join(args, ","))
	default:
		return fmt.Sprintf("<invalid: %v>", t.t)
	}
}

func printShortComposite(t reflect.Type) string {
	switch t {
	case WindowedValueType:
		return "W"
	case CoGBKType:
		return "CoGBK"
	case KVType:
		return "KV"
	default:
		return fmt.Sprintf("invalid(%v)", t)
	}
}

// TODO(herohde) 4/20/2017: internalize fulltypes?

// New constructs a new full type with the given elements. It panics
// if not valid.
func New(t reflect.Type, components ...FullType) FullType {
	checkTypesNotNil(components)

	class := ClassOf(t)
	switch class {
	case Concrete, Universal:
		return &tree{class, t, nil}
	case Container:
		switch t.Kind() {
		case reflect.Slice:
			// We include the child type as a component for convenience.
			return &tree{class, t, []FullType{New(t.Elem())}}
		default:
			panic(fmt.Sprintf("Unexpected aggregate type: %v", t))
		}
	case Composite:
		switch t {
		case KVType:
			if len(components) != 2 {
				panic("Invalid number of components for KV")
			}
			if isAnyNonKVComposite(components) {
				panic("Invalid to nest composites inside KV")
			}
			return &tree{class, t, components}
		case WindowedValueType:
			if len(components) != 1 {
				panic("Invalid number of components for WindowedValue")
			}
			if components[0].Type() == WindowedValueType {
				panic("Invalid to nest WindowedValue")
			}
			return &tree{class, t, components}
		case CoGBKType:
			if len(components) < 2 {
				panic("Invalid number of components for CoGBK")
			}
			if isAnyNonKVComposite(components) {
				panic("Invalid to nest composites inside CoGBK")
			}
			return &tree{class, t, components}
		default:
			panic(fmt.Sprintf("Unexpected composite type: %v", t))
		}
	default:
		panic(fmt.Sprintf("Invalid underlying type: %v", t))
	}
}

// NOTE(herohde) 1/26/2018: we allow nested KV types and coders to support the
// CoGBK translation (using KV<K,KV<int,[]byte>> to encode keyed raw union values)
// and potentially other uses. We do not have a reasonable way to emit nested KV
// values, so user functions are still limited to non-nested KVs. Universally-typed
// KV-values might be simple to allow, for example.

func isAnyNonKVComposite(list []FullType) bool {
	for _, t := range list {
		if t.Class() == Composite && t.Type() != KVType {
			return true
		}
	}
	return false
}

// Convenience functions.

// IsW returns true iff the type is a WindowedValue.
func IsW(t FullType) bool {
	return t.Type() == WindowedValueType
}

// NewW constructs a new WindowedValue of the given type.
func NewW(t FullType) FullType {
	return New(WindowedValueType, t)
}

// SkipW skips a WindowedValue layer, if present. If no, returns the input.
func SkipW(t FullType) FullType {
	if t.Type() == WindowedValueType {
		return t.Components()[0]
	}
	return t
}

// IsKV returns true iff the type is a KV.
func IsKV(t FullType) bool {
	return t.Type() == KVType
}

// NewKV constructs a new KV of the given key and value types.
func NewKV(components ...FullType) FullType {
	return New(KVType, components...)
}

// IsCoGBK returns true iff the type is a CoGBK.
func IsCoGBK(t FullType) bool {
	return t.Type() == CoGBKType
}

// NewCoGBK constructs a new CoGBK of the given component types.
func NewCoGBK(components ...FullType) FullType {
	return New(CoGBKType, components...)
}

// IsStructurallyAssignable returns true iff a from value is structurally
// assignable to the to value of the given types. Types that are
// "structurally assignable" (SA) are assignable if type variables are
// disregarded. In other words, depending on the bindings of universal
// type variables, types may or may not be assignable. However, types that
// are not SA are not assignable under any bindings.
//
// For example:
//
//   SA:  KV<int,int>    := KV<int,int>
//   SA:  KV<int,X>      := KV<int,string>  // X bound to string by assignment
//   SA:  KV<int,string> := KV<int,X>       // Assignable only if X is already bound to string
//   SA:  KV<int,string> := KV<X,X>         // Not assignable under any binding
//
//   Not SA:  KV<int,string> := KV<string,X>
//   Not SA:  X              := KV<int,string>
//   Not SA:  GBK(X,Y)       := KV<int,string>
//
func IsStructurallyAssignable(from, to FullType) bool {
	switch from.Class() {
	case Concrete:
		if to.Class() == Concrete {
			return from.Type().AssignableTo(to.Type())
		}
		return to.Class() == Universal
	case Universal:
		// Universals are not structurally assignable to Composites, such as KV or GBK.
		return to.Class() == Universal || to.Class() == Concrete || to.Class() == Container
	case Container:
		if to.Class() == Container {
			return IsList(from.Type()) && IsList(to.Type()) && IsStructurallyAssignable(from.Components()[0], to.Components()[0])
		}
		return to.Class() == Universal
	case Composite:
		if from.Type() != to.Type() {
			return false
		}
		if len(from.Components()) != len(to.Components()) {
			return false
		}
		for i, elm := range from.Components() {
			if !IsStructurallyAssignable(elm, to.Components()[i]) {
				return false
			}
		}
		return true
	default:
		return false
	}
}

// IsEqual returns true iff the types are equal.
func IsEqual(from, to FullType) bool {
	if from.Type() != to.Type() {
		return false
	}
	if len(from.Components()) != len(to.Components()) {
		return false
	}
	for i, elm := range from.Components() {
		if !IsEqual(elm, to.Components()[i]) {
			return false
		}
	}
	return true
}

// IsEqualList returns true iff the lists of types are equal.
func IsEqualList(from, to []FullType) bool {
	if len(from) != len(to) {
		return false
	}
	for i, t := range from {
		if !IsEqual(t, to[i]) {
			return false
		}
	}
	return true
}

// IsBound returns true iff the type has no universal type components.
// Nodes and coders need bound types.
func IsBound(t FullType) bool {
	if t.Class() == Universal {
		return false
	}
	for _, elm := range t.Components() {
		if !IsBound(elm) {
			return false
		}
	}
	return true
}

// Bind returns a substitution from universals to types in the given models,
// such as {"T" -> X, "X" -> int}. Each model must be assignable to the
// corresponding type. For example, Bind(KV<T,int>, KV<string, int>) would
// produce {"T" -> string}.
func Bind(types, models []FullType) (map[string]reflect.Type, error) {
	if len(types) != len(models) {
		return nil, errors.Errorf("typex.Bind: invalid number of models: %v, want %v", len(models), len(types))
	}

	m := make(map[string]reflect.Type)
	for i := 0; i < len(types); i++ {
		t := types[i]
		model := models[i]

		if !IsStructurallyAssignable(model, t) {
			return nil, errors.Errorf("typex.Bind: %v is not assignable to %v", model, t)
		}
		if err := walk(t, model, m); err != nil {
			return nil, err
		}
	}
	return m, nil
}

func walk(t, model FullType, m map[string]reflect.Type) error {
	switch t.Class() {
	case Universal:
		// By checking that the model is assignable to t, we know that they are
		// structurally compatible. We rely on the exact reflect.Type in the
		// Aggregate case to pick the correct binding, i.e., we do not need to
		// construct such a type.

		name := t.Type().Name()
		if current, ok := m[name]; ok && current != model.Type() {
			return errors.Errorf("bind conflict for %v: %v != %v", name, current, model.Type())
		}
		m[name] = model.Type()
		return nil
	case Composite, Container:
		for i, elm := range t.Components() {
			if err := walk(elm, model.Components()[i], m); err != nil {
				return err
			}
		}
		return nil
	default:
		return nil
	}
}

// Substitute returns types identical to the given types, but with all
// universals substituted. All free type variables must be present in the
// substitution.
func Substitute(list []FullType, m map[string]reflect.Type) ([]FullType, error) {
	var ret []FullType
	for _, t := range list {
		repl, err := substitute(t, m)
		if err != nil {
			return nil, err
		}
		ret = append(ret, repl)
	}
	return ret, nil
}

func substitute(t FullType, m map[string]reflect.Type) (FullType, error) {
	switch t.Class() {
	case Universal:
		name := t.Type().Name()
		repl, ok := m[name]
		if !ok {
			return nil, errors.Errorf("substituting type %v: type not bound", name)
		}
		return New(repl), nil
	case Container:
		comp, err := substituteList(t.Components(), m)
		if err != nil {
			return nil, err
		}
		if IsList(t.Type()) {
			return New(reflect.SliceOf(comp[0].Type()), comp...), nil
		}
		return nil, errors.Errorf("unexpected aggregate %v, only slices allowed", t)
	case Composite:
		comp, err := substituteList(t.Components(), m)
		if err != nil {
			return nil, err
		}
		return New(t.Type(), comp...), nil

	default:
		return t, nil
	}
}

func substituteList(list []FullType, m map[string]reflect.Type) ([]FullType, error) {
	var ret []FullType
	for _, elm := range list {
		repl, err := substitute(elm, m)
		if err != nil {
			return nil, err
		}
		ret = append(ret, repl)
	}
	return ret, nil
}

func checkTypesNotNil(list []FullType) {
	for i, t := range list {
		if t == nil {
			panic(fmt.Sprintf("nil type at index: %v", i))
		}
	}
}
