// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package command

import (
	"encoding/binary"
	"encoding/json"
	"fmt"
	"io"
	"os"
	"os/exec"
	"path/filepath"
	"time"

	"github.com/coreos/etcd/client"
	etcdErr "github.com/coreos/etcd/error"
	"github.com/coreos/etcd/etcdserver"
	"github.com/coreos/etcd/etcdserver/api"
	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
	"github.com/coreos/etcd/etcdserver/membership"
	"github.com/coreos/etcd/mvcc"
	"github.com/coreos/etcd/mvcc/backend"
	"github.com/coreos/etcd/mvcc/mvccpb"
	"github.com/coreos/etcd/pkg/pbutil"
	"github.com/coreos/etcd/pkg/types"
	"github.com/coreos/etcd/raft/raftpb"
	"github.com/coreos/etcd/snap"
	"github.com/coreos/etcd/store"
	"github.com/coreos/etcd/wal"
	"github.com/coreos/etcd/wal/walpb"
	"github.com/gogo/protobuf/proto"
	"github.com/spf13/cobra"
)

var (
	migrateExcludeTTLKey bool
	migrateDatadir       string
	migrateWALdir        string
	migrateTransformer   string
)

// NewMigrateCommand returns the cobra command for "migrate".
func NewMigrateCommand() *cobra.Command {
	mc := &cobra.Command{
		Use:   "migrate",
		Short: "Migrates keys in a v2 store to a mvcc store",
		Run:   migrateCommandFunc,
	}

	mc.Flags().BoolVar(&migrateExcludeTTLKey, "no-ttl", false, "Do not convert TTL keys")
	mc.Flags().StringVar(&migrateDatadir, "data-dir", "", "Path to the data directory")
	mc.Flags().StringVar(&migrateWALdir, "wal-dir", "", "Path to the WAL directory")
	mc.Flags().StringVar(&migrateTransformer, "transformer", "", "Path to the user-provided transformer program")
	return mc
}

func migrateCommandFunc(cmd *cobra.Command, args []string) {
	var (
		writer io.WriteCloser
		reader io.ReadCloser
		errc   chan error
	)
	if migrateTransformer != "" {
		writer, reader, errc = startTransformer()
	} else {
		fmt.Println("using default transformer")
		writer, reader, errc = defaultTransformer()
	}

	st, index := rebuildStoreV2()
	be := prepareBackend()
	defer be.Close()

	go func() {
		writeStore(writer, st)
		writer.Close()
	}()

	readKeys(reader, be)
	mvcc.UpdateConsistentIndex(be, index)
	err := <-errc
	if err != nil {
		fmt.Println("failed to transform keys")
		ExitWithError(ExitError, err)
	}

	fmt.Println("finished transforming keys")
}

func prepareBackend() backend.Backend {
	var be backend.Backend

	bch := make(chan struct{})
	dbpath := filepath.Join(migrateDatadir, "member", "snap", "db")
	go func() {
		defer close(bch)
		be = backend.NewDefaultBackend(dbpath)

	}()
	select {
	case <-bch:
	case <-time.After(time.Second):
		fmt.Fprintf(os.Stderr, "waiting for etcd to close and release its lock on %q\n", dbpath)
		<-bch
	}

	tx := be.BatchTx()
	tx.Lock()
	tx.UnsafeCreateBucket([]byte("key"))
	tx.UnsafeCreateBucket([]byte("meta"))
	tx.Unlock()
	return be
}

func rebuildStoreV2() (store.Store, uint64) {
	var index uint64
	cl := membership.NewCluster("")

	waldir := migrateWALdir
	if len(waldir) == 0 {
		waldir = filepath.Join(migrateDatadir, "member", "wal")
	}
	snapdir := filepath.Join(migrateDatadir, "member", "snap")

	ss := snap.New(snapdir)
	snapshot, err := ss.Load()
	if err != nil && err != snap.ErrNoSnapshot {
		ExitWithError(ExitError, err)
	}

	var walsnap walpb.Snapshot
	if snapshot != nil {
		walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
		index = snapshot.Metadata.Index
	}

	w, err := wal.OpenForRead(waldir, walsnap)
	if err != nil {
		ExitWithError(ExitError, err)
	}
	defer w.Close()

	_, _, ents, err := w.ReadAll()
	if err != nil {
		ExitWithError(ExitError, err)
	}

	st := store.New()
	if snapshot != nil {
		err := st.Recovery(snapshot.Data)
		if err != nil {
			ExitWithError(ExitError, err)
		}
	}

	cl.SetStore(st)
	cl.Recover(api.UpdateCapability)

	applier := etcdserver.NewApplierV2(st, cl)
	for _, ent := range ents {
		if ent.Type == raftpb.EntryConfChange {
			var cc raftpb.ConfChange
			pbutil.MustUnmarshal(&cc, ent.Data)
			applyConf(cc, cl)
			continue
		}

		var raftReq pb.InternalRaftRequest
		if !pbutil.MaybeUnmarshal(&raftReq, ent.Data) { // backward compatible
			var r pb.Request
			pbutil.MustUnmarshal(&r, ent.Data)
			applyRequest(&r, applier)
		} else {
			if raftReq.V2 != nil {
				req := raftReq.V2
				applyRequest(req, applier)
			}
		}
		if ent.Index > index {
			index = ent.Index
		}
	}

	return st, index
}

func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) {
	if err := cl.ValidateConfigurationChange(cc); err != nil {
		return
	}
	switch cc.Type {
	case raftpb.ConfChangeAddNode:
		m := new(membership.Member)
		if err := json.Unmarshal(cc.Context, m); err != nil {
			panic(err)
		}
		cl.AddMember(m)
	case raftpb.ConfChangeRemoveNode:
		cl.RemoveMember(types.ID(cc.NodeID))
	case raftpb.ConfChangeUpdateNode:
		m := new(membership.Member)
		if err := json.Unmarshal(cc.Context, m); err != nil {
			panic(err)
		}
		cl.UpdateRaftAttributes(m.ID, m.RaftAttributes)
	}
}

func applyRequest(req *pb.Request, applyV2 etcdserver.ApplierV2) {
	r := (*etcdserver.RequestV2)(req)
	r.TTLOptions()
	switch r.Method {
	case "POST":
		applyV2.Post(r)
	case "PUT":
		applyV2.Put(r)
	case "DELETE":
		applyV2.Delete(r)
	case "QGET":
		applyV2.QGet(r)
	case "SYNC":
		applyV2.Sync(r)
	default:
		panic("unknown command")
	}
}

func writeStore(w io.Writer, st store.Store) uint64 {
	all, err := st.Get("/1", true, true)
	if err != nil {
		if eerr, ok := err.(*etcdErr.Error); ok && eerr.ErrorCode == etcdErr.EcodeKeyNotFound {
			fmt.Println("no v2 keys to migrate")
			os.Exit(0)
		}
		ExitWithError(ExitError, err)
	}
	return writeKeys(w, all.Node)
}

func writeKeys(w io.Writer, n *store.NodeExtern) uint64 {
	maxIndex := n.ModifiedIndex

	nodes := n.Nodes
	// remove store v2 bucket prefix
	n.Key = n.Key[2:]
	if n.Key == "" {
		n.Key = "/"
	}
	if n.Dir {
		n.Nodes = nil
	}
	if !migrateExcludeTTLKey || n.TTL == 0 {
		b, err := json.Marshal(n)
		if err != nil {
			ExitWithError(ExitError, err)
		}
		fmt.Fprint(w, string(b))
	}
	for _, nn := range nodes {
		max := writeKeys(w, nn)
		if max > maxIndex {
			maxIndex = max
		}
	}
	return maxIndex
}

func readKeys(r io.Reader, be backend.Backend) error {
	for {
		length64, err := readInt64(r)
		if err != nil {
			if err == io.EOF {
				return nil
			}
			return err
		}

		buf := make([]byte, int(length64))
		if _, err = io.ReadFull(r, buf); err != nil {
			return err
		}

		var kv mvccpb.KeyValue
		err = proto.Unmarshal(buf, &kv)
		if err != nil {
			return err
		}

		mvcc.WriteKV(be, kv)
	}
}

func readInt64(r io.Reader) (int64, error) {
	var n int64
	err := binary.Read(r, binary.LittleEndian, &n)
	return n, err
}

func startTransformer() (io.WriteCloser, io.ReadCloser, chan error) {
	cmd := exec.Command(migrateTransformer)
	cmd.Stderr = os.Stderr

	writer, err := cmd.StdinPipe()
	if err != nil {
		ExitWithError(ExitError, err)
	}

	reader, rerr := cmd.StdoutPipe()
	if rerr != nil {
		ExitWithError(ExitError, rerr)
	}

	if err := cmd.Start(); err != nil {
		ExitWithError(ExitError, err)
	}

	errc := make(chan error, 1)

	go func() {
		errc <- cmd.Wait()
	}()

	return writer, reader, errc
}

func defaultTransformer() (io.WriteCloser, io.ReadCloser, chan error) {
	// transformer decodes v2 keys from sr
	sr, sw := io.Pipe()
	// transformer encodes v3 keys into dw
	dr, dw := io.Pipe()

	decoder := json.NewDecoder(sr)

	errc := make(chan error, 1)

	go func() {
		defer func() {
			sr.Close()
			dw.Close()
		}()

		for decoder.More() {
			node := &client.Node{}
			if err := decoder.Decode(node); err != nil {
				errc <- err
				return
			}

			kv := transform(node)
			if kv == nil {
				continue
			}

			data, err := proto.Marshal(kv)
			if err != nil {
				errc <- err
				return
			}
			buf := make([]byte, 8)
			binary.LittleEndian.PutUint64(buf, uint64(len(data)))
			if _, err := dw.Write(buf); err != nil {
				errc <- err
				return
			}
			if _, err := dw.Write(data); err != nil {
				errc <- err
				return
			}
		}

		errc <- nil
	}()

	return sw, dr, errc
}

func transform(n *client.Node) *mvccpb.KeyValue {
	const unKnownVersion = 1
	if n.Dir {
		return nil
	}
	kv := &mvccpb.KeyValue{
		Key:            []byte(n.Key),
		Value:          []byte(n.Value),
		CreateRevision: int64(n.CreatedIndex),
		ModRevision:    int64(n.ModifiedIndex),
		Version:        unKnownVersion,
	}
	return kv
}
