/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kv

import (
	"context"
	"encoding/json"
	"regexp"
	"strings"

	"github.com/go-chassis/cari/sync"
	"github.com/go-chassis/etcdadpt"
	"github.com/go-chassis/openlog"

	"github.com/apache/servicecomb-kie/pkg/model"
	"github.com/apache/servicecomb-kie/pkg/util"
	"github.com/apache/servicecomb-kie/server/datasource"
	"github.com/apache/servicecomb-kie/server/datasource/auth"
	"github.com/apache/servicecomb-kie/server/datasource/etcd/key"

	eventbase "github.com/apache/servicecomb-service-center/eventbase/datasource/etcd/key"
)

// Dao operate data in mongodb
type Dao struct {
}

func (s *Dao) Create(ctx context.Context, kv *model.KVDoc, options ...datasource.WriteOption) (*model.KVDoc, error) {
	if err := auth.CheckCreateKV(ctx, kv); err != nil {
		return nil, err
	}

	opts := datasource.NewWriteOptions(options...)
	var exist bool
	var err error
	if opts.SyncEnable {
		// if syncEnable is true, will create task in a transaction operation
		exist, err = txnCreate(ctx, kv)
	} else {
		exist, err = create(ctx, kv)
	}
	if err != nil {
		openlog.Error("create error", openlog.WithTags(openlog.Tags{
			"err": err.Error(),
			"kv":  kv,
		}))
		return nil, err
	}
	if !exist {
		openlog.Error("create error", openlog.WithTags(openlog.Tags{
			"err": datasource.ErrKVAlreadyExists.Error(),
			"kv":  kv,
		}))
		return nil, datasource.ErrKVAlreadyExists
	}
	return kv, nil
}

func create(ctx context.Context, kv *model.KVDoc) (bool, error) {
	kvBytes, err := json.Marshal(kv)
	if err != nil {
		openlog.Error("fail to marshal kv " + err.Error())
		return false, err
	}
	return etcdadpt.InsertBytes(ctx, key.KV(kv.Domain, kv.Project, kv.ID), kvBytes)
}

func txnCreate(ctx context.Context, kv *model.KVDoc) (bool, error) {
	kvBytes, err := json.Marshal(kv)
	if err != nil {
		openlog.Error("fail to marshal kv " + err.Error())
		return false, err
	}
	task, err := sync.NewTask(kv.Domain, kv.Project, sync.CreateAction, datasource.ConfigResource, kv)
	if err != nil {
		openlog.Error("fail to create task" + err.Error())
		return false, err
	}
	taskBytes, err := json.Marshal(task)
	if err != nil {
		openlog.Error("fail to marshal task ")
		return false, err
	}
	kvOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.KV(kv.Domain, kv.Project, kv.ID)), etcdadpt.WithValue(kvBytes))
	taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(eventbase.TaskKey(kv.Domain, kv.Project, task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes))
	resp, err := etcdadpt.TxnWithCmp(ctx, []etcdadpt.OpOptions{kvOpPut, taskOpPut},
		etcdadpt.If(etcdadpt.NotExistKey(string(kvOpPut.Key)), etcdadpt.NotExistKey(string(taskOpPut.Key))), nil)
	if err != nil {
		return false, err
	}
	return resp.Succeeded, nil
}

// Update update key value
func (s *Dao) Update(ctx context.Context, kv *model.KVDoc, options ...datasource.WriteOption) error {
	keyKV := key.KV(kv.Domain, kv.Project, kv.ID)
	resp, err := etcdadpt.Get(ctx, keyKV)
	if err != nil {
		openlog.Error(err.Error())
		return err
	}
	if resp == nil {
		return datasource.ErrKeyNotExists
	}
	var oldKV model.KVDoc
	err = json.Unmarshal(resp.Value, &oldKV)
	if err != nil {
		openlog.Error(err.Error())
		return err
	}

	if err := auth.CheckUpdateKV(ctx, &oldKV); err != nil {
		return err
	}

	oldKV.LabelFormat = kv.LabelFormat
	oldKV.Value = kv.Value
	oldKV.Status = kv.Status
	oldKV.Checker = kv.Checker
	oldKV.UpdateTime = kv.UpdateTime
	oldKV.UpdateRevision = kv.UpdateRevision

	opts := datasource.NewWriteOptions(options...)
	if opts.SyncEnable {
		// if syncEnable is true, will create task in a transaction operation
		err = txnUpdate(ctx, kv)
	} else {
		err = update(ctx, &oldKV, options...)
	}
	if err != nil {
		openlog.Error(err.Error())
		return err
	}
	return nil
}

func txnUpdate(ctx context.Context, kv *model.KVDoc) error {
	keyKV := key.KV(kv.Domain, kv.Project, kv.ID)
	kvBytes, err := json.Marshal(kv)
	if err != nil {
		openlog.Error(err.Error())
		return err
	}
	task, err := sync.NewTask(kv.Domain, kv.Project, sync.UpdateAction, datasource.ConfigResource, kv)
	if err != nil {
		openlog.Error("fail to create task" + err.Error())
		return err
	}
	taskBytes, err := json.Marshal(task)
	if err != nil {
		openlog.Error(err.Error())
		return err
	}
	kvOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(keyKV), etcdadpt.WithValue(kvBytes))
	taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(eventbase.TaskKey(kv.Domain, kv.Project, task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes))
	return etcdadpt.Txn(ctx, []etcdadpt.OpOptions{kvOpPut, taskOpPut})
}

func update(ctx context.Context, kv *model.KVDoc, options ...datasource.WriteOption) error {
	keyKV := key.KV(kv.Domain, kv.Project, kv.ID)
	kvBytes, err := json.Marshal(kv)
	if err != nil {
		openlog.Error(err.Error())
		return err
	}
	return etcdadpt.PutBytes(ctx, keyKV, kvBytes)
}

// Extract key values
func getValue(str string) string {
	rex := regexp.MustCompile(`\(([^)]+)\)`)
	res := rex.FindStringSubmatch(str)
	return res[len(res)-1]
}

// Exist supports you query a key value by label map or labels id
func (s *Dao) Exist(ctx context.Context, key, project, domain string, options ...datasource.FindOption) (bool, error) {
	opts := datasource.FindOptions{Key: key}
	for _, o := range options {
		o(&opts)
	}
	kvs, err := s.listNoAuth(ctx, project, domain,
		datasource.WithExactLabels(),
		datasource.WithLabels(opts.Labels),
		datasource.WithLabelFormat(opts.LabelFormat),
		datasource.WithKey(key),
		datasource.WithCaseSensitive())
	if err != nil {
		openlog.Error("check kv exist: " + err.Error())
		return false, err
	}
	if IsUniqueFind(opts) && len(kvs.Data) == 0 {
		return false, nil
	}
	if len(kvs.Data) != 1 {
		return false, datasource.ErrTooMany
	}
	return true, nil
}

func (s *Dao) GetByKey(ctx context.Context, key, project, domain string, options ...datasource.FindOption) ([]*model.KVDoc, error) {
	opts := datasource.FindOptions{Key: key}
	for _, o := range options {
		o(&opts)
	}
	kvs, err := s.listNoAuth(ctx, project, domain,
		datasource.WithExactLabels(),
		datasource.WithLabels(opts.Labels),
		datasource.WithLabelFormat(opts.LabelFormat),
		datasource.WithKey(key),
		datasource.WithCaseSensitive())
	if err != nil {
		openlog.Error("check kv exist: " + err.Error())
		return nil, err
	}
	if IsUniqueFind(opts) && len(kvs.Data) == 0 {
		return nil, datasource.ErrKeyNotExists
	}
	if len(kvs.Data) != 1 {
		return nil, datasource.ErrTooMany
	}
	return kvs.Data, nil
}

// FindOneAndDelete deletes one kv by id and return the deleted kv as these appeared before deletion
// domain=tenant
func (s *Dao) FindOneAndDelete(ctx context.Context, kvID, project, domain string, options ...datasource.WriteOption) (*model.KVDoc, error) {
	opts := datasource.NewWriteOptions(options...)
	if opts.SyncEnable {
		// if syncEnable is ture, will delete kv, create task and create tombstone in a transaction operation
		return txnFindOneAndDelete(ctx, kvID, project, domain)
	}
	return findOneAndDelete(ctx, kvID, project, domain)
}

func findOneAndDelete(ctx context.Context, kvID, project, domain string) (*model.KVDoc, error) {
	kvKey := key.KV(domain, project, kvID)
	kvDoc := model.KVDoc{}

	if _, err := getKVDoc(ctx, domain, project, kvID); err != nil {
		return nil, err
	}

	resp, err := etcdadpt.ListAndDelete(ctx, kvKey)
	if err != nil {
		openlog.Error("delete Key error: " + err.Error())
		return nil, err
	}
	if resp.Count == 0 {
		return nil, datasource.ErrKeyNotExists
	}
	err = json.Unmarshal(resp.Kvs[0].Value, &kvDoc)
	if err != nil {
		openlog.Error("decode error: " + err.Error())
		return nil, err
	}
	return &kvDoc, nil
}

// txnFindOneAndDelete is to start transaction when delete KV, will create task and tombstone in a transaction operation
func txnFindOneAndDelete(ctx context.Context, kvID, project, domain string) (*model.KVDoc, error) {
	kvKey := key.KV(domain, project, kvID)
	kvDoc, err := getKVDoc(ctx, domain, project, kvID)
	if err != nil {
		openlog.Error(err.Error())
		return nil, err
	}
	task, err := sync.NewTask(domain, project, sync.DeleteAction, datasource.ConfigResource, kvDoc)
	if err != nil {
		openlog.Error("fail to create task" + err.Error())
		return nil, err
	}
	taskBytes, err := json.Marshal(task)
	if err != nil {
		openlog.Error("fail to marshal task" + err.Error())
		return nil, err
	}
	tombstone := sync.NewTombstone(domain, project, datasource.ConfigResource, datasource.TombstoneID(kvDoc))
	tombstoneBytes, err := json.Marshal(tombstone)
	if err != nil {
		openlog.Error("fail to marshal tombstone" + err.Error())
		return nil, err
	}
	kvOpDel := etcdadpt.OpDel(etcdadpt.WithStrKey(kvKey))
	taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(eventbase.TaskKey(domain, project,
		task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes))
	tombstoneOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(eventbase.TombstoneKey(domain, project, tombstone.ResourceType, tombstone.ResourceID)), etcdadpt.WithValue(tombstoneBytes))
	err = etcdadpt.Txn(ctx, []etcdadpt.OpOptions{kvOpDel, taskOpPut, tombstoneOpPut})
	if err != nil {
		openlog.Error("find and delete error", openlog.WithTags(openlog.Tags{
			"err": err.Error(),
		}))
		return nil, err
	}
	return kvDoc, nil
}

// getKVDoc is to get kv for delete
func getKVDoc(ctx context.Context, domain, project, kvID string) (*model.KVDoc, error) {
	resp, err := etcdadpt.Get(ctx, key.KV(domain, project, kvID))
	if err != nil {
		openlog.Error(err.Error())
		return nil, err
	}
	if resp == nil {
		return nil, datasource.ErrKeyNotExists
	}
	curKV := &model.KVDoc{}
	err = json.Unmarshal(resp.Value, curKV)
	if err != nil {
		openlog.Error("decode error: " + err.Error())
		return nil, err
	}

	if err := auth.CheckDeleteKV(ctx, curKV); err != nil {
		return nil, err
	}

	return curKV, nil
}

// FindManyAndDelete deletes multiple kvs and return the deleted kv list as these appeared before deletion
func (s *Dao) FindManyAndDelete(ctx context.Context, kvIDs []string, project, domain string, options ...datasource.WriteOption) ([]*model.KVDoc, int64, error) {
	opts := datasource.NewWriteOptions(options...)
	if opts.SyncEnable {
		// if sync enable is true, will delete kvs, create tasks and tombstones
		return txnFindManyAndDelete(ctx, kvIDs, project, domain)
	}
	return findManyAndDelete(ctx, kvIDs, project, domain)
}

func findManyAndDelete(ctx context.Context, kvIDs []string, project, domain string) ([]*model.KVDoc, int64, error) {
	var docs []*model.KVDoc
	var opOptions []etcdadpt.OpOptions
	var err error
	for _, id := range kvIDs {
		if _, err = getKVDoc(ctx, domain, project, id); err != nil {
			continue
		}
		opOptions = append(opOptions, etcdadpt.OpDel(etcdadpt.WithStrKey(key.KV(domain, project, id))))
	}

	if len(opOptions) == 0 {
		return make([]*model.KVDoc, 0), 0, err
	}

	resp, err := etcdadpt.ListAndDeleteMany(ctx, opOptions...)
	if err != nil {
		openlog.Error("find Keys error: " + err.Error())
		return nil, 0, err
	}
	if resp.Count == 0 {
		return nil, 0, datasource.ErrKeyNotExists
	}
	for _, kv := range resp.Kvs {
		var doc model.KVDoc
		err := json.Unmarshal(kv.Value, &doc)
		if err != nil {
			openlog.Error("fail to unmarshal kv" + err.Error())
			return nil, 0, err
		}
		docs = append(docs, &doc)
	}
	return docs, resp.Count, nil
}

// txnFindManyAndDelete is to start transaction when delete KVs, will create tasks and tombstones in a transaction operation
func txnFindManyAndDelete(ctx context.Context, kvIDs []string, project, domain string) ([]*model.KVDoc, int64, error) {
	var docs []*model.KVDoc
	var opOptions []etcdadpt.OpOptions
	kvTotalNum := len(kvIDs)
	docs = make([]*model.KVDoc, kvTotalNum)
	tasks := make([]*sync.Task, kvTotalNum)
	tombstones := make([]*sync.Tombstone, kvTotalNum)
	successKVNum := 0
	for i := 0; i < kvTotalNum; i++ {
		kvDoc, err := getKVDoc(ctx, domain, project, kvIDs[i])
		// if not find the kv, continue
		if err != nil {
			if err == datasource.ErrKeyNotExists {
				openlog.Error(err.Error())
				continue
			}
			return nil, 0, err
		}
		if kvDoc == nil {
			continue
		}
		task, err := sync.NewTask(domain, project, sync.DeleteAction, datasource.ConfigResource, kvDoc)
		if err != nil {
			openlog.Error("fail to create task")
			return nil, 0, err
		}
		docs[successKVNum] = kvDoc
		tasks[successKVNum] = task
		tombstones[successKVNum] = sync.NewTombstone(domain, project, datasource.ConfigResource,
			datasource.TombstoneID(kvDoc))
		successKVNum++
	}
	if successKVNum == 0 {
		return nil, 0, datasource.ErrKeyNotExists
	}
	if successKVNum != kvTotalNum {
		docs = docs[:successKVNum]
		tasks = tasks[:successKVNum]
		tombstones = tombstones[:successKVNum]
	}
	for _, id := range kvIDs {
		opOptions = append(opOptions, etcdadpt.OpDel(etcdadpt.WithStrKey(key.KV(domain, project, id))))
	}
	for _, task := range tasks {
		taskBytes, err := json.Marshal(task)
		if err != nil {
			openlog.Error("fail to marshal task" + err.Error())
			return nil, 0, err
		}
		opOptions = append(opOptions, etcdadpt.OpPut(etcdadpt.WithStrKey(eventbase.TaskKey(domain, project,
			task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes)))
	}
	for _, tombstone := range tombstones {
		tombstoneBytes, err := json.Marshal(tombstone)
		if err != nil {
			openlog.Error("fail to marshal tombstone" + err.Error())
			return nil, 0, err
		}
		opOptions = append(opOptions, etcdadpt.OpPut(etcdadpt.WithStrKey(eventbase.TombstoneKey(domain, project,
			tombstone.ResourceType, tombstone.ResourceID)), etcdadpt.WithValue(tombstoneBytes)))
	}
	err := etcdadpt.Txn(ctx, opOptions)
	if err != nil {
		openlog.Error("find many and delete error", openlog.WithTags(openlog.Tags{
			"err": err.Error(),
		}))
		return nil, 0, err
	}
	return docs, int64(successKVNum), nil
}

// Get get kv by kv id
func (s *Dao) Get(ctx context.Context, req *model.GetKVRequest) (*model.KVDoc, error) {
	resp, err := etcdadpt.Get(ctx, key.KV(req.Domain, req.Project, req.ID))
	if err != nil {
		openlog.Error(err.Error())
		return nil, err
	}
	if resp == nil {
		return nil, datasource.ErrKeyNotExists
	}
	curKV := &model.KVDoc{}
	err = json.Unmarshal(resp.Value, curKV)
	if err != nil {
		openlog.Error("decode error: " + err.Error())
		return nil, err
	}

	if err := auth.CheckGetKV(ctx, curKV); err != nil {
		return nil, err
	}

	return curKV, nil
}

func (s *Dao) Total(ctx context.Context, project, domain string) (int64, error) {
	_, total, err := etcdadpt.List(ctx, key.KVList(domain, project), etcdadpt.WithCountOnly())
	if err != nil {
		openlog.Error("find total number: " + err.Error())
		return 0, err
	}
	return total, err
}

// List get kv list by key and criteria
func (s *Dao) List(ctx context.Context, project, domain string, options ...datasource.FindOption) (*model.KVResponse, error) {
	result, opts, err := s.listData(ctx, project, domain, options...)
	if err != nil {
		return nil, err
	}

	filterKVs, err := auth.FilterKVList(ctx, result.Data)
	if err != nil {
		return nil, err
	}

	result.Data = filterKVs
	result.Total = len(filterKVs)

	return pagingResult(result, opts), nil
}

func (s *Dao) listNoAuth(ctx context.Context, project, domain string, options ...datasource.FindOption) (*model.KVResponse, error) {
	result, opts, err := s.listData(ctx, project, domain, options...)
	if err != nil {
		return nil, err
	}

	return pagingResult(result, opts), nil
}

// List get kv list by key and criteria
func (s *Dao) listData(ctx context.Context, project, domain string, options ...datasource.FindOption) (*model.KVResponse, datasource.FindOptions, error) {
	opts := datasource.NewDefaultFindOpts()
	for _, o := range options {
		o(&opts)
	}
	ctx, cancel := context.WithTimeout(ctx, opts.Timeout)
	defer cancel()

	regex, err := toRegex(opts)
	if err != nil {
		return nil, opts, err
	}

	if Enabled() {
		result, useCache, err := Search(ctx, &CacheSearchReq{
			Domain:  domain,
			Project: project,
			Opts:    &opts,
			Regex:   regex,
		})
		if useCache && err == nil {
			return result, opts, nil
		}
		if useCache && err != nil {
			openlog.Error("using cache to search kv failed: " + err.Error())
		}
	}

	result, err := matchLabelsSearch(ctx, domain, project, regex, opts)
	if err != nil {
		openlog.Error("list kv failed: " + err.Error())
		return nil, opts, err
	}

	return result, opts, nil
}

func matchLabelsSearch(ctx context.Context, domain, project string, regex *regexp.Regexp, opts datasource.FindOptions) (*model.KVResponse, error) {
	openlog.Debug("using labels to search kv")
	kvs, _, err := etcdadpt.List(ctx, key.KVList(domain, project))
	if err != nil {
		return nil, err
	}
	result := &model.KVResponse{
		Data: []*model.KVDoc{},
	}
	for _, kv := range kvs {
		var doc model.KVDoc
		err := json.Unmarshal(kv.Value, &doc)
		if err != nil {
			openlog.Error("decode to KVList error: " + err.Error())
			continue
		}
		if !filterMatch(&doc, opts, regex) {
			continue
		}

		datasource.ClearPart(&doc)
		result.Data = append(result.Data, &doc)
		result.Total++

		if IsUniqueFind(opts) {
			break
		}
	}

	return result, nil
}

func IsUniqueFind(opts datasource.FindOptions) bool {
	return opts.LabelFormat != "" && opts.Key != ""
}

func toRegex(opts datasource.FindOptions) (*regexp.Regexp, error) {
	var value string
	if opts.Key == "" {
		return nil, nil
	}
	switch {
	case strings.HasPrefix(opts.Key, "beginWith("):
		value = strings.ReplaceAll(getValue(opts.Key), ".", "\\.") + ".*"
	case strings.HasPrefix(opts.Key, "wildcard("):
		value = strings.ReplaceAll(getValue(opts.Key), ".", "\\.")
		value = strings.ReplaceAll(value, "*", ".*")
	default:
		value = strings.ReplaceAll(opts.Key, ".", "\\.")
	}
	value = "^" + value + "$"
	if !opts.CaseSensitive {
		value = "(?i)" + value
	}
	regex, err := regexp.Compile(value)
	if err != nil {
		openlog.Error("invalid wildcard expr: " + value + ", error: " + err.Error())
		return nil, err
	}
	return regex, nil
}

func pagingResult(result *model.KVResponse, opts datasource.FindOptions) *model.KVResponse {
	datasource.ReverseByPriorityAndUpdateRev(result.Data)

	if opts.Limit == 0 {
		return result
	}
	total := int64(result.Total)
	if opts.Offset >= total {
		result.Data = []*model.KVDoc{}
		return result
	}
	end := opts.Offset + opts.Limit
	if end > total {
		end = total
	}
	result.Data = result.Data[opts.Offset:end]
	return result
}

func filterMatch(doc *model.KVDoc, opts datasource.FindOptions, regex *regexp.Regexp) bool {
	if opts.Status != "" && doc.Status != opts.Status {
		return false
	}
	if regex != nil && !regex.MatchString(doc.Key) {
		return false
	}
	if len(opts.Labels) != 0 {
		if opts.ExactLabels && !util.IsEquivalentLabel(opts.Labels, doc.Labels) {
			return false
		}
		if !opts.ExactLabels && !util.IsContainLabel(doc.Labels, opts.Labels) {
			return false
		}
	}
	if opts.LabelFormat != "" && doc.LabelFormat != opts.LabelFormat {
		return false
	}
	if opts.Value != "" && !strings.Contains(doc.Value, opts.Value) {
		return false
	}
	return true
}
