/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package internal

import (
	"context"
	"errors"
	"math/rand"
	"sort"
	"strconv"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	"github.com/json-iterator/go"
	"github.com/tidwall/gjson"

	"github.com/apache/rocketmq-client-go/v2/internal/remote"
	"github.com/apache/rocketmq-client-go/v2/internal/utils"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/rlog"
)

const (
	EnvNameServerAddr = "NAMESRV_ADDR"

	requestTimeout   = 3 * time.Second
	defaultTopic     = "TBW102"
	defaultQueueNums = 4
	MasterId         = int64(0)
)

var (
	ErrTopicNotExist = errors.New("topic not exist")
)

func (s *namesrvs) cleanOfflineBroker() {
	// TODO optimize
	s.lockNamesrv.Lock()
	s.brokerAddressesMap.Range(func(key, value interface{}) bool {
		brokerName := key.(string)
		bd := value.(*BrokerData)
		for k, v := range bd.BrokerAddresses {
			isBrokerAddrExistInTopicRoute := false
			s.routeDataMap.Range(func(key, value interface{}) bool {
				trd := value.(*TopicRouteData)
				for idx := range trd.BrokerDataList {
					for _, v1 := range trd.BrokerDataList[idx].BrokerAddresses {
						if v1 == v {
							isBrokerAddrExistInTopicRoute = true
							return false
						}
					}
				}
				return true
			})
			if !isBrokerAddrExistInTopicRoute {
				delete(bd.BrokerAddresses, k)
				rlog.Info("the broker: [name=%s, ID=%d, addr=%s,] is offline, remove it", map[string]interface{}{
					"brokerName": brokerName,
					"brokerID":   k,
					"brokerAddr": v,
				})
			}
		}
		if len(bd.BrokerAddresses) == 0 {
			s.brokerAddressesMap.Delete(brokerName)
			rlog.Info("the broker name's host is offline, remove it", map[string]interface{}{
				"brokerName": brokerName,
			})
		}
		return true
	})
	s.lockNamesrv.Unlock()
}

// key is topic, value is TopicPublishInfo
type TopicPublishInfo struct {
	OrderTopic          bool
	HaveTopicRouterInfo bool
	MqList              []*primitive.MessageQueue
	RouteData           *TopicRouteData
	TopicQueueIndex     int32
}

func (info *TopicPublishInfo) isOK() (bIsTopicOk bool) {
	return len(info.MqList) > 0
}

func (info *TopicPublishInfo) fetchQueueIndex() int {
	length := len(info.MqList)
	if length <= 0 {
		return -1
	}
	qIndex := atomic.AddInt32(&info.TopicQueueIndex, 1)
	return int(qIndex) % length
}

func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
	s.lockNamesrv.Lock()
	defer s.lockNamesrv.Unlock()

	routeData, err := s.queryTopicRouteInfoFromServer(topic)
	if err != nil {
		routeData, err = s.queryTopicRouteInfoFromServer(defaultTopic)
		if err != nil {
			rlog.Warning("query topic route from server error", map[string]interface{}{
				rlog.LogKeyUnderlayError: err,
			})
			return nil, false
		}
	}

	if routeData == nil {
		rlog.Warning("queryTopicRouteInfoFromServer return nil", map[string]interface{}{
			rlog.LogKeyTopic: topic,
		})
		return nil, false
	}

	oldRouteData, exist := s.routeDataMap.Load(topic)

	changed := true
	if exist {
		changed = s.topicRouteDataIsChange(oldRouteData.(*TopicRouteData), routeData)
	}

	if changed {
		s.routeDataMap.Store(topic, routeData)
		rlog.Info("the topic route info changed", map[string]interface{}{
			rlog.LogKeyTopic:            topic,
			rlog.LogKeyValueChangedFrom: oldRouteData,
			rlog.LogKeyValueChangedTo:   routeData.String(),
		})
		for _, brokerData := range routeData.BrokerDataList {
			s.brokerAddressesMap.Store(brokerData.BrokerName, brokerData)
		}
	}

	return routeData.clone(), changed
}

func (s *namesrvs) AddBroker(routeData *TopicRouteData) {
	for _, brokerData := range routeData.BrokerDataList {
		s.brokerAddressesMap.Store(brokerData.BrokerName, brokerData)
	}
}

func (s *namesrvs) FindBrokerAddrByTopic(topic string) string {
	v, exist := s.routeDataMap.Load(topic)
	if !exist {
		return ""
	}
	routeData := v.(*TopicRouteData)
	if len(routeData.BrokerDataList) == 0 {
		return ""
	}
	i := utils.AbsInt(rand.Int())
	bd := routeData.BrokerDataList[i%len(routeData.BrokerDataList)]
	addr := bd.BrokerAddresses[MasterId]
	if addr == "" && len(bd.BrokerAddresses) > 0 {
		i = i % len(bd.BrokerAddresses)
		for _, v := range bd.BrokerAddresses {
			if i <= 0 {
				addr = v
				break
			}
			i--
		}
	}
	return addr
}

func (s *namesrvs) FindBrokerAddrByName(brokerName string) string {
	bd, exist := s.brokerAddressesMap.Load(brokerName)

	if !exist {
		return ""
	}

	return bd.(*BrokerData).BrokerAddresses[MasterId]
}

func (s *namesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
	var (
		brokerAddr = ""
		//slave      = false
		//found      = false
	)

	v, exist := s.brokerAddressesMap.Load(brokerName)

	if !exist {
		return nil
	}
	data := v.(*BrokerData)
	if len(data.BrokerAddresses) == 0 {
		return nil
	}

	brokerAddr = data.BrokerAddresses[brokerId]
	//for k, v := range data.BrokerAddresses {
	//	if v != "" {
	//		found = true
	//		if k != MasterId {
	//			slave = true
	//		}
	//		brokerAddr = v
	//		break
	//	}
	//}

	var result *FindBrokerResult
	if brokerAddr != "" {
		result = &FindBrokerResult{
			BrokerAddr:    brokerAddr,
			Slave:         brokerId != 0,
			BrokerVersion: s.findBrokerVersion(brokerName, brokerAddr),
		}
	}

	return result
}

func (s *namesrvs) FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
	routeData, err := s.queryTopicRouteInfoFromServer(topic)

	if err != nil {
		return nil, err
	}

	mqs := make([]*primitive.MessageQueue, 0)

	for _, qd := range routeData.QueueDataList {
		if queueIsReadable(qd.Perm) {
			for i := 0; i < qd.ReadQueueNums; i++ {
				mqs = append(mqs, &primitive.MessageQueue{Topic: topic, BrokerName: qd.BrokerName, QueueId: i})
			}
		}
	}
	return mqs, nil
}

func (s *namesrvs) FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
	var (
		err       error
		routeData *TopicRouteData
	)

	v, exist := s.routeDataMap.Load(topic)
	if !exist {
		routeData, err = s.queryTopicRouteInfoFromServer(topic)
		if err != nil {
			rlog.Error("queryTopicRouteInfoFromServer failed", map[string]interface{}{
				rlog.LogKeyTopic: topic,
			})
			return nil, err
		}
		s.routeDataMap.Store(topic, routeData)
		s.AddBroker(routeData)
	} else {
		routeData = v.(*TopicRouteData)
	}

	if err != nil {
		return nil, err
	}
	publishInfo := s.routeData2PublishInfo(topic, routeData)

	return publishInfo.MqList, nil
}

func (s *namesrvs) AddBrokerVersion(brokerName, brokerAddr string, version int32) {
	s.brokerLock.Lock()
	defer s.brokerLock.Unlock()

	m, exist := s.brokerVersionMap[brokerName]
	if !exist {
		m = make(map[string]int32, 4)
		s.brokerVersionMap[brokerName] = m
	}
	m[brokerAddr] = version
}

func (s *namesrvs) findBrokerVersion(brokerName, brokerAddr string) int32 {
	s.brokerLock.RLock()
	defer s.brokerLock.RUnlock()

	versions, exist := s.brokerVersionMap[brokerName]

	if !exist {
		return 0
	}

	return versions[brokerAddr]
}

func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, error) {
	request := &GetRouteInfoRequestHeader{
		Topic: topic,
	}

	var (
		response *remote.RemotingCommand
		err      error
	)
	for i := 0; i < s.Size(); i++ {
		rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
		ctx, _ := context.WithTimeout(context.Background(), requestTimeout)
		response, err = s.nameSrvClient.InvokeSync(ctx, s.getNameServerAddress(), rc)

		if err == nil {
			break
		}
	}
	if err != nil {
		rlog.Error("connect to namesrv failed.", map[string]interface{}{
			"namesrv": s,
		})
		return nil, err
	}

	switch response.Code {
	case ResSuccess:
		if response.Body == nil {
			return nil, errors.New(response.Remark)
		}
		routeData := &TopicRouteData{}

		err = routeData.decode(string(response.Body))
		if err != nil {
			rlog.Warning("decode TopicRouteData error: %s", map[string]interface{}{
				rlog.LogKeyUnderlayError: err,
			})
			return nil, err
		}
		return routeData, nil
	case ResTopicNotExist:
		return nil, ErrTopicNotExist
	default:
		return nil, errors.New(response.Remark)
	}
}

func (s *namesrvs) topicRouteDataIsChange(oldData *TopicRouteData, newData *TopicRouteData) bool {
	if oldData == nil || newData == nil {
		return true
	}
	oldDataCloned := oldData.clone()
	newDataCloned := newData.clone()

	sort.Slice(oldDataCloned.QueueDataList, func(i, j int) bool {
		return strings.Compare(oldDataCloned.QueueDataList[i].BrokerName, oldDataCloned.QueueDataList[j].BrokerName) > 0
	})
	sort.Slice(oldDataCloned.BrokerDataList, func(i, j int) bool {
		return strings.Compare(oldDataCloned.BrokerDataList[i].BrokerName, oldDataCloned.BrokerDataList[j].BrokerName) > 0
	})
	sort.Slice(newDataCloned.QueueDataList, func(i, j int) bool {
		return strings.Compare(newDataCloned.QueueDataList[i].BrokerName, newDataCloned.QueueDataList[j].BrokerName) > 0
	})
	sort.Slice(newDataCloned.BrokerDataList, func(i, j int) bool {
		return strings.Compare(newDataCloned.BrokerDataList[i].BrokerName, newDataCloned.BrokerDataList[j].BrokerName) > 0
	})

	return !oldDataCloned.equals(newDataCloned)
}

func (s *namesrvs) routeData2PublishInfo(topic string, data *TopicRouteData) *TopicPublishInfo {
	publishInfo := &TopicPublishInfo{
		RouteData:  data,
		OrderTopic: false,
	}

	if data.OrderTopicConf != "" {
		brokers := strings.Split(data.OrderTopicConf, ";")
		for _, broker := range brokers {
			item := strings.Split(broker, ":")
			nums, _ := strconv.Atoi(item[1])
			for i := 0; i < nums; i++ {
				mq := &primitive.MessageQueue{
					Topic:      topic,
					BrokerName: item[0],
					QueueId:    i,
				}
				publishInfo.MqList = append(publishInfo.MqList, mq)
			}
		}

		publishInfo.OrderTopic = true
		return publishInfo
	}

	qds := data.QueueDataList
	sort.Slice(qds, func(i, j int) bool {
		return i-j >= 0
	})

	for _, qd := range qds {
		if !queueIsWriteable(qd.Perm) {
			continue
		}

		var bData *BrokerData
		for _, bd := range data.BrokerDataList {
			if bd.BrokerName == qd.BrokerName {
				bData = bd
				break
			}
		}

		if bData == nil || bData.BrokerAddresses[MasterId] == "" {
			continue
		}

		for i := 0; i < qd.WriteQueueNums; i++ {
			mq := &primitive.MessageQueue{
				Topic:      topic,
				BrokerName: qd.BrokerName,
				QueueId:    i,
			}
			publishInfo.MqList = append(publishInfo.MqList, mq)
		}
	}

	return publishInfo
}

// TopicRouteData TopicRouteData
type TopicRouteData struct {
	OrderTopicConf string
	QueueDataList  []*QueueData  `json:"queueDatas"`
	BrokerDataList []*BrokerData `json:"brokerDatas"`
}

func (routeData *TopicRouteData) decode(data string) error {
	res := gjson.Parse(data)
	err := jsoniter.Unmarshal([]byte(res.Get("queueDatas").String()), &routeData.QueueDataList)

	if err != nil {
		return err
	}

	bds := res.Get("brokerDatas").Array()
	routeData.BrokerDataList = make([]*BrokerData, len(bds))
	for idx, v := range bds {
		bd := &BrokerData{
			BrokerName:      v.Get("brokerName").String(),
			Cluster:         v.Get("cluster").String(),
			BrokerAddresses: make(map[int64]string, 0),
		}
		addrs := v.Get("brokerAddrs").String()
		strs := strings.Split(addrs[1:len(addrs)-1], ",")
		if strs != nil {
			for _, str := range strs {
				i := strings.Index(str, ":")
				if i < 0 {
					continue
				}
				id, _ := strconv.ParseInt(str[0:i], 10, 64)
				bd.BrokerAddresses[id] = strings.Replace(str[i+1:], "\"", "", -1)
			}
		}
		routeData.BrokerDataList[idx] = bd
	}
	return nil
}

func (routeData *TopicRouteData) clone() *TopicRouteData {
	cloned := &TopicRouteData{
		OrderTopicConf: routeData.OrderTopicConf,
		QueueDataList:  make([]*QueueData, len(routeData.QueueDataList)),
		BrokerDataList: make([]*BrokerData, len(routeData.BrokerDataList)),
	}

	for index, value := range routeData.QueueDataList {
		cloned.QueueDataList[index] = value
	}

	for index, value := range routeData.BrokerDataList {
		cloned.BrokerDataList[index] = value
	}

	return cloned
}

func (routeData *TopicRouteData) equals(data *TopicRouteData) bool {
	if len(routeData.BrokerDataList) != len(data.BrokerDataList) {
		return false
	}
	if len(routeData.QueueDataList) != len(data.QueueDataList) {
		return false
	}

	for idx := range routeData.BrokerDataList {
		if !routeData.BrokerDataList[idx].Equals(data.BrokerDataList[idx]) {
			return false
		}
	}

	for idx := range routeData.QueueDataList {
		if !routeData.QueueDataList[idx].Equals(data.QueueDataList[idx]) {
			return false
		}
	}
	return true
}

func (routeData *TopicRouteData) String() string {
	data, _ := jsoniter.Marshal(routeData)
	return string(data)
}

// QueueData QueueData
type QueueData struct {
	BrokerName     string `json:"brokerName"`
	ReadQueueNums  int    `json:"readQueueNums"`
	WriteQueueNums int    `json:"writeQueueNums"`
	Perm           int    `json:"perm"`
	TopicSynFlag   int    `json:"topicSynFlag"`
}

func (q *QueueData) Equals(qd *QueueData) bool {
	if q.BrokerName != qd.BrokerName {
		return false
	}

	if q.ReadQueueNums != qd.ReadQueueNums {
		return false
	}

	if q.WriteQueueNums != qd.WriteQueueNums {
		return false
	}

	if q.Perm != qd.Perm {
		return false
	}

	if q.TopicSynFlag != qd.TopicSynFlag {
		return false
	}

	return true
}

// BrokerData BrokerData
type BrokerData struct {
	Cluster             string           `json:"cluster"`
	BrokerName          string           `json:"brokerName"`
	BrokerAddresses     map[int64]string `json:"brokerAddrs"`
	brokerAddressesLock sync.RWMutex
}

func (b *BrokerData) Equals(bd *BrokerData) bool {
	if b.Cluster != bd.Cluster {
		return false
	}

	if b.BrokerName != bd.BrokerName {
		return false
	}

	for k, v := range b.BrokerAddresses {
		if bd.BrokerAddresses[k] != v {
			return false
		}
	}

	return true
}
