/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package internal

import (
	"bytes"
	"context"
	"errors"
	"fmt"
	"net"
	"os"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/apache/rocketmq-client-go/internal/remote"
	"github.com/apache/rocketmq-client-go/internal/utils"
	"github.com/apache/rocketmq-client-go/primitive"
	"github.com/apache/rocketmq-client-go/rlog"
)

const (
	defaultTraceRegionID = "DefaultRegion"

	// tracing message switch
	_TranceOff = "false"

	// Pulling topic information interval from the named server
	_PullNameServerInterval = 30 * time.Second

	// Sending heart beat interval to all broker
	_HeartbeatBrokerInterval = 30 * time.Second

	// Offset persistent interval for consumer
	_PersistOffset = 5 * time.Second

	// Rebalance interval
	_RebalanceInterval = 10 * time.Second
)

var (
	ErrServiceState = errors.New("service close is not running, please check")

	_VIPChannelEnable = false
)

func init() {
	if os.Getenv("com.rocketmq.sendMessageWithVIPChannel") != "" {
		value, err := strconv.ParseBool(os.Getenv("com.rocketmq.sendMessageWithVIPChannel"))
		if err == nil {
			_VIPChannelEnable = value
		}
	}
}

type InnerProducer interface {
	PublishTopicList() []string
	UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
	IsPublishTopicNeedUpdate(topic string) bool
	IsUnitMode() bool
}

type InnerConsumer interface {
	PersistConsumerOffset() error
	UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue)
	IsSubscribeTopicNeedUpdate(topic string) bool
	SubscriptionDataList() []*SubscriptionData
	Rebalance()
	IsUnitMode() bool
}

func DefaultClientOptions() ClientOptions {
	opts := ClientOptions{
		InstanceName: "DEFAULT",
		RetryTimes:   3,
		ClientIP:     utils.LocalIP,
	}
	return opts
}

type ClientOptions struct {
	GroupName         string
	NameServerAddrs   primitive.NamesrvAddr
	Namesrv           *namesrvs
	ClientIP          string
	InstanceName      string
	UnitMode          bool
	UnitName          string
	VIPChannelEnabled bool
	RetryTimes        int
	Interceptors      []primitive.Interceptor
	Credentials       primitive.Credentials
	Namespace         string
}

func (opt *ClientOptions) ChangeInstanceNameToPID() {
	if opt.InstanceName == "DEFAULT" {
		opt.InstanceName = strconv.Itoa(os.Getpid())
	}
}

func (opt *ClientOptions) String() string {
	return fmt.Sprintf("ClientOption [ClientIP=%s, InstanceName=%s, "+
		"UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v]", opt.ClientIP,
		opt.InstanceName, opt.UnitMode, opt.UnitName, opt.VIPChannelEnabled)
}

//go:generate mockgen -source client.go -destination mock_client.go -self_package github.com/apache/rocketmq-client-go/internal  --package internal RMQClient
type RMQClient interface {
	Start()
	Shutdown()

	ClientID() string

	RegisterProducer(group string, producer InnerProducer)
	InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand,
		timeoutMillis time.Duration) (*remote.RemotingCommand, error)
	InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand,
		timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error
	InvokeOneWay(ctx context.Context, addr string, request *remote.RemotingCommand,
		timeoutMillis time.Duration) error
	CheckClientInBroker()
	SendHeartbeatToAllBrokerWithLock()
	UpdateTopicRouteInfo()

	ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) error

	RegisterConsumer(group string, consumer InnerConsumer) error
	UnregisterConsumer(group string)
	PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error)
	PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader, f func(result *primitive.PullResult)) error
	RebalanceImmediately()
	UpdatePublishInfo(topic string, data *TopicRouteData)
}

var _ RMQClient = new(rmqClient)

type rmqClient struct {
	option ClientOptions
	// group -> InnerProducer
	producerMap sync.Map

	// group -> InnerConsumer
	consumerMap sync.Map
	once        sync.Once

	remoteClient remote.RemotingClient
	hbMutex      sync.Mutex
	close        bool
	namesrvs     *namesrvs
	done         chan struct{}
	shutdownOnce sync.Once
}

var clientMap sync.Map

func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) RMQClient {
	client := &rmqClient{
		option:       option,
		remoteClient: remote.NewRemotingClient(),
		namesrvs:     option.Namesrv,
		done:         make(chan struct{}),
	}
	actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
	if !loaded {
		client.remoteClient.RegisterRequestFunc(ReqNotifyConsumerIdsChanged, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
			rlog.Info("receive broker's notification to consumer group", map[string]interface{}{
				rlog.LogKeyConsumerGroup: req.ExtFields["consumerGroup"],
			})
			client.RebalanceImmediately()
			return nil
		})
		client.remoteClient.RegisterRequestFunc(ReqCheckTransactionState, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
			header := new(CheckTransactionStateRequestHeader)
			header.Decode(req.ExtFields)
			msgExts := primitive.DecodeMessage(req.Body)
			if len(msgExts) == 0 {
				rlog.Warning("checkTransactionState, decode message failed", nil)
				return nil
			}
			msgExt := msgExts[0]
			// TODO: add namespace support
			transactionID := msgExt.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
			if len(transactionID) > 0 {
				msgExt.TransactionId = transactionID
			}
			group := msgExt.GetProperty(primitive.PropertyProducerGroup)
			if group == "" {
				rlog.Warning("checkTransactionState, pick producer group failed", nil)
				return nil
			}
			if option.GroupName != group {
				rlog.Warning("producer group is not equal", nil)
				return nil
			}
			callback := CheckTransactionStateCallback{
				Addr:   addr,
				Msg:    *msgExt,
				Header: *header,
			}
			callbackCh <- callback
			return nil
		})

		client.remoteClient.RegisterRequestFunc(ReqGetConsumerRunningInfo, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
			rlog.Info("receive get consumer running info request...", nil)
			res := remote.NewRemotingCommand(ResError, nil, nil)
			res.Remark = "the go client has not supported consumer running info"
			return res
		})
	}
	return actual.(*rmqClient)
}

func (c *rmqClient) Start() {
	//ctx, cancel := context.WithCancel(context.Background())
	//c.cancel = cancel
	c.once.Do(func() {
		if !c.option.Credentials.IsEmpty() {
			c.remoteClient.RegisterInterceptor(remote.ACLInterceptor(c.option.Credentials))
		}
		// TODO fetchNameServerAddr
		go func() {}()

		// schedule update route info
		go func() {
			// delay
			ticker := time.NewTicker(_PullNameServerInterval)
			defer ticker.Stop()
			time.Sleep(50 * time.Millisecond)
			for {
				select {
				case <-ticker.C:
					c.UpdateTopicRouteInfo()
				case <-c.done:
					rlog.Info("The RMQClient stopping update topic route info.", map[string]interface{}{
						"clientID": c.ClientID(),
					})
					return
				}
			}
		}()

		// TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
		go func() {
			ticker := time.NewTicker(_HeartbeatBrokerInterval)
			defer ticker.Stop()
			for {
				select {
				case <-ticker.C:
					c.namesrvs.cleanOfflineBroker()
					c.SendHeartbeatToAllBrokerWithLock()
				case <-c.done:
					rlog.Info("The RMQClient stopping clean off line broker and heart beat", map[string]interface{}{
						"clientID": c.ClientID(),
					})
					return
				}
			}
		}()

		// schedule persist offset
		go func() {
			ticker := time.NewTicker(_PersistOffset)
			defer ticker.Stop()
			for {
				select {
				case <-ticker.C:
					c.consumerMap.Range(func(key, value interface{}) bool {
						consumer := value.(InnerConsumer)
						err := consumer.PersistConsumerOffset()
						if err != nil {
							rlog.Error("persist offset failed", map[string]interface{}{
								rlog.LogKeyUnderlayError: err,
							})
						}
						return true
					})
				case <-c.done:
					rlog.Info("The RMQClient stopping persist offset", map[string]interface{}{
						"clientID": c.ClientID(),
					})
					return
				}
			}
		}()

		go func() {
			ticker := time.NewTicker(_RebalanceInterval)
			defer ticker.Stop()
			for {
				select {
				case <-ticker.C:
					c.RebalanceImmediately()
				case <-c.done:
					rlog.Info("The RMQClient stopping do rebalance", map[string]interface{}{
						"clientID": c.ClientID(),
					})
					return
				}
			}
		}()
	})
}

func (c *rmqClient) Shutdown() {
	c.shutdownOnce.Do(func() {
		close(c.done)
		c.close = true
		c.remoteClient.ShutDown()
	})
}

func (c *rmqClient) ClientID() string {
	id := c.option.ClientIP + "@" + c.option.InstanceName
	if c.option.UnitName != "" {
		id += "@" + c.option.UnitName
	}
	return id
}

func (c *rmqClient) InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand,
	timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
	if c.close {
		return nil, ErrServiceState
	}
	return c.remoteClient.InvokeSync(ctx, addr, request, timeoutMillis)
}

func (c *rmqClient) InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand,
	timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
	if c.close {
		return ErrServiceState
	}
	return c.remoteClient.InvokeAsync(ctx, addr, request, timeoutMillis, func(future *remote.ResponseFuture) {
		f(future.ResponseCommand, future.Err)
	})

}

func (c *rmqClient) InvokeOneWay(ctx context.Context, addr string, request *remote.RemotingCommand,
	timeoutMillis time.Duration) error {
	if c.close {
		return ErrServiceState
	}
	return c.remoteClient.InvokeOneWay(ctx, addr, request, timeoutMillis)
}

func (c *rmqClient) CheckClientInBroker() {
}

// TODO
func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
	c.hbMutex.Lock()
	defer c.hbMutex.Unlock()
	hbData := NewHeartbeatData(c.ClientID())

	c.producerMap.Range(func(key, value interface{}) bool {
		pData := producerData{
			GroupName: key.(string),
		}
		hbData.ProducerDatas.Add(pData)
		return true
	})

	c.consumerMap.Range(func(key, value interface{}) bool {
		consumer := value.(InnerConsumer)
		cData := consumerData{
			GroupName:         key.(string),
			CType:             "PUSH",
			MessageModel:      "CLUSTERING",
			Where:             "CONSUME_FROM_FIRST_OFFSET",
			UnitMode:          consumer.IsUnitMode(),
			SubscriptionDatas: consumer.SubscriptionDataList(),
		}
		hbData.ConsumerDatas.Add(cData)
		return true
	})
	if hbData.ProducerDatas.Len() == 0 && hbData.ConsumerDatas.Len() == 0 {
		rlog.Info("sending heartbeat, but no producer and no consumer", nil)
		return
	}
	c.namesrvs.brokerAddressesMap.Range(func(key, value interface{}) bool {
		brokerName := key.(string)
		data := value.(*BrokerData)
		for id, addr := range data.BrokerAddresses {
			cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
			response, err := c.remoteClient.InvokeSync(context.Background(), addr, cmd, 3*time.Second)
			if err != nil {
				rlog.Warning("send heart beat to broker error", map[string]interface{}{
					rlog.LogKeyUnderlayError: err,
				})
				return true
			}
			if response.Code == ResSuccess {
				v, exist := c.namesrvs.brokerVersionMap.Load(brokerName)
				var m map[string]int32
				if exist {
					m = v.(map[string]int32)
				} else {
					m = make(map[string]int32, 4)
					c.namesrvs.brokerVersionMap.Store(brokerName, m)
				}
				m[brokerName] = int32(response.Version)
				rlog.Debug("send heart beat to broker success", map[string]interface{}{
					"brokerName": brokerName,
					"brokerId":   id,
					"brokerAddr": addr,
				})
			}
		}
		return true
	})
}

func (c *rmqClient) UpdateTopicRouteInfo() {
	publishTopicSet := make(map[string]bool, 0)
	c.producerMap.Range(func(key, value interface{}) bool {
		producer := value.(InnerProducer)
		list := producer.PublishTopicList()
		for idx := range list {
			publishTopicSet[list[idx]] = true
		}
		return true
	})
	for topic := range publishTopicSet {
		c.UpdatePublishInfo(topic, c.namesrvs.UpdateTopicRouteInfo(topic))
	}

	subscribedTopicSet := make(map[string]bool, 0)
	c.consumerMap.Range(func(key, value interface{}) bool {
		consumer := value.(InnerConsumer)
		list := consumer.SubscriptionDataList()
		for idx := range list {
			subscribedTopicSet[list[idx].Topic] = true
		}
		return true
	})

	for topic := range subscribedTopicSet {
		c.updateSubscribeInfo(topic, c.namesrvs.UpdateTopicRouteInfo(topic))
	}
}

func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) error {
	var status primitive.SendStatus
	switch cmd.Code {
	case ResFlushDiskTimeout:
		status = primitive.SendFlushDiskTimeout
	case ResFlushSlaveTimeout:
		status = primitive.SendFlushSlaveTimeout
	case ResSlaveNotAvailable:
		status = primitive.SendSlaveNotAvailable
	case ResSuccess:
		status = primitive.SendOK
	default:
		status = primitive.SendUnknownError
		return errors.New(cmd.Remark)
	}

	msgIDs := make([]string, 0)
	for i := 0; i < len(msgs); i++ {
		msgIDs = append(msgIDs, msgs[i].GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex))
	}

	regionId := cmd.ExtFields[primitive.PropertyMsgRegion]
	trace := cmd.ExtFields[primitive.PropertyTraceSwitch]

	if regionId == "" {
		regionId = defaultTraceRegionID
	}

	qId, _ := strconv.Atoi(cmd.ExtFields["queueId"])
	off, _ := strconv.ParseInt(cmd.ExtFields["queueOffset"], 10, 64)

	resp.Status = status
	resp.MsgID = cmd.ExtFields["msgId"]
	resp.OffsetMsgID = cmd.ExtFields["msgId"]
	resp.MessageQueue = &primitive.MessageQueue{
		Topic:      msgs[0].Topic,
		BrokerName: brokerName,
		QueueId:    qId,
	}
	resp.QueueOffset = off
	//TransactionID: sendResponse.TransactionId,
	resp.RegionID = regionId
	resp.TraceOn = trace != "" && trace != _TranceOff
	return nil
}

// PullMessage with sync
func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error) {
	cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
	res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd, 10*time.Second)
	if err != nil {
		return nil, err
	}

	return c.processPullResponse(res)
}

func (c *rmqClient) processPullResponse(response *remote.RemotingCommand) (*primitive.PullResult, error) {

	pullResult := &primitive.PullResult{}
	switch response.Code {
	case ResSuccess:
		pullResult.Status = primitive.PullFound
	case ResPullNotFound:
		pullResult.Status = primitive.PullNoNewMsg
	case ResPullRetryImmediately:
		pullResult.Status = primitive.PullNoMsgMatched
	case ResPullOffsetMoved:
		pullResult.Status = primitive.PullOffsetIllegal
	default:
		return nil, fmt.Errorf("unknown Response Code: %d, remark: %s", response.Code, response.Remark)
	}

	c.decodeCommandCustomHeader(pullResult, response)
	pullResult.SetBody(response.Body)

	return pullResult, nil
}

func (c *rmqClient) decodeCommandCustomHeader(pr *primitive.PullResult, cmd *remote.RemotingCommand) {
	v, exist := cmd.ExtFields["maxOffset"]
	if exist {
		pr.MaxOffset, _ = strconv.ParseInt(v, 10, 64)
	}

	v, exist = cmd.ExtFields["minOffset"]
	if exist {
		pr.MinOffset, _ = strconv.ParseInt(v, 10, 64)
	}

	v, exist = cmd.ExtFields["nextBeginOffset"]
	if exist {
		pr.NextBeginOffset, _ = strconv.ParseInt(v, 10, 64)
	}

	v, exist = cmd.ExtFields["suggestWhichBrokerId"]
	if exist {
		pr.SuggestWhichBrokerId, _ = strconv.ParseInt(v, 10, 64)
	}
}

// PullMessageAsync pull message async
func (c *rmqClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader, f func(result *primitive.PullResult)) error {
	return nil
}

func (c *rmqClient) RegisterConsumer(group string, consumer InnerConsumer) error {
	_, exist := c.consumerMap.Load(group)
	if exist {
		rlog.Warning("the consumer group exist already", map[string]interface{}{
			rlog.LogKeyConsumerGroup: group,
		})
		return fmt.Errorf("the consumer group exist already")
	}
	c.consumerMap.Store(group, consumer)
	return nil
}

func (c *rmqClient) UnregisterConsumer(group string) {
}

func (c *rmqClient) RegisterProducer(group string, producer InnerProducer) {
	c.producerMap.Store(group, producer)
}

func (c *rmqClient) UnregisterProducer(group string) {
}

func (c *rmqClient) SelectProducer(group string) InnerProducer {
	return nil
}

func (c *rmqClient) SelectConsumer(group string) InnerConsumer {
	return nil
}

func (c *rmqClient) RebalanceImmediately() {
	c.consumerMap.Range(func(key, value interface{}) bool {
		consumer := value.(InnerConsumer)
		consumer.Rebalance()
		return true
	})
}

func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
	if data == nil {
		return
	}
	if !c.isNeedUpdatePublishInfo(topic) {
		return
	}
	c.producerMap.Range(func(key, value interface{}) bool {
		p := value.(InnerProducer)
		publishInfo := c.namesrvs.routeData2PublishInfo(topic, data)
		publishInfo.HaveTopicRouterInfo = true
		p.UpdateTopicPublishInfo(topic, publishInfo)
		return true
	})
}

func (c *rmqClient) isNeedUpdatePublishInfo(topic string) bool {
	var result bool
	c.producerMap.Range(func(key, value interface{}) bool {
		p := value.(InnerProducer)
		if p.IsPublishTopicNeedUpdate(topic) {
			result = true
			return false
		}
		return true
	})
	return result
}

func (c *rmqClient) updateSubscribeInfo(topic string, data *TopicRouteData) {
	if data == nil {
		return
	}
	if !c.isNeedUpdateSubscribeInfo(topic) {
		return
	}
	c.consumerMap.Range(func(key, value interface{}) bool {
		consumer := value.(InnerConsumer)
		// TODO
		consumer.UpdateTopicSubscribeInfo(topic, routeData2SubscribeInfo(topic, data))
		return true
	})
}

func (c *rmqClient) isNeedUpdateSubscribeInfo(topic string) bool {
	var result bool
	c.consumerMap.Range(func(key, value interface{}) bool {
		consumer := value.(InnerConsumer)
		if consumer.IsSubscribeTopicNeedUpdate(topic) {
			result = true
			return false
		}
		return true
	})
	return result
}

func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*primitive.MessageQueue {
	list := make([]*primitive.MessageQueue, 0)
	for idx := range data.QueueDataList {
		qd := data.QueueDataList[idx]
		if queueIsReadable(qd.Perm) {
			for i := 0; i < qd.ReadQueueNums; i++ {
				list = append(list, &primitive.MessageQueue{
					Topic:      topic,
					BrokerName: qd.BrokerName,
					QueueId:    i,
				})
			}
		}
	}
	return list
}

func encodeMessages(message []*primitive.Message) []byte {
	var buffer bytes.Buffer
	index := 0
	for index < len(message) {
		buffer.Write(message[index].Body)
		index++
	}
	return buffer.Bytes()
}

func brokerVIPChannel(brokerAddr string) string {
	if !_VIPChannelEnable {
		return brokerAddr
	}
	var brokerAddrNew strings.Builder
	ipAndPort := strings.Split(brokerAddr, ":")
	port, err := strconv.Atoi(ipAndPort[1])
	if err != nil {
		return ""
	}
	brokerAddrNew.WriteString(ipAndPort[0])
	brokerAddrNew.WriteString(":")
	brokerAddrNew.WriteString(strconv.Itoa(port - 2))
	return brokerAddrNew.String()
}
