/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package remote

import (
	"bytes"
	"encoding/binary"
	"fmt"
	"sync/atomic"

	jsoniter "github.com/json-iterator/go"
)

var opaque int32

const (
	// 0, REQUEST_COMMAND
	RPCType = 0
	// 1, RPC
	RPCOneWay = 1
	//ResponseType for response
	ResponseType = 1
	_Flag        = 0
	_Version     = 317
)

type LanguageCode byte

const (
	_Java    = LanguageCode(0)
	_Go      = LanguageCode(9)
	_Unknown = LanguageCode(127)
)

func (lc LanguageCode) MarshalJSON() ([]byte, error) {
	return []byte(`"GO"`), nil
}

func (lc *LanguageCode) UnmarshalJSON(b []byte) error {
	switch string(b) {
	case "JAVA":
		*lc = _Java
	case "GO", `"GO"`:
		*lc = _Go
	default:
		*lc = _Unknown
	}
	return nil
}

func (lc LanguageCode) String() string {
	switch lc {
	case _Java:
		return "JAVA"
	case _Go:
		return "GO"
	default:
		return "unknown"
	}
}

type RemotingCommand struct {
	Code      int16             `json:"code"`
	Language  LanguageCode      `json:"language"`
	Version   int16             `json:"version"`
	Opaque    int32             `json:"opaque"`
	Flag      int32             `json:"flag"`
	Remark    string            `json:"remark"`
	ExtFields map[string]string `json:"extFields"`
	Body      []byte            `json:"-"`
}

type CustomHeader interface {
	Encode() map[string]string
}

func NewRemotingCommand(code int16, header CustomHeader, body []byte) *RemotingCommand {
	cmd := &RemotingCommand{
		Code:      code,
		Version:   _Version,
		Opaque:    atomic.AddInt32(&opaque, 1),
		Body:      body,
		Language:  _Go,
		ExtFields: make(map[string]string),
	}

	if header != nil {
		cmd.ExtFields = header.Encode()
	}

	return cmd
}

func (command *RemotingCommand) String() string {
	return fmt.Sprintf("Code: %d, opaque: %d, Remark: %s, ExtFields: %v",
		command.Code, command.Opaque, command.Remark, command.ExtFields)
}

func (command *RemotingCommand) isResponseType() bool {
	return command.Flag&(ResponseType) == ResponseType
}

func (command *RemotingCommand) markResponseType() {
	command.Flag = command.Flag | ResponseType
}

var (
	jsonSerializer     = &jsonCodec{}
	rocketMqSerializer = &rmqCodec{}
	codecType          byte
)

// encode RemotingCommand
//
// Frame format:
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// + item | frame_size | header_length |         header_body        |     body     +
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// + len  |   4bytes   |     4bytes    | (21 + r_len + e_len) bytes | remain bytes +
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
func encode(command *RemotingCommand) ([]byte, error) {
	var (
		header []byte
		err    error
	)

	switch codecType {
	case JsonCodecs:
		header, err = jsonSerializer.encodeHeader(command)
	case RocketMQCodecs:
		header, err = rocketMqSerializer.encodeHeader(command)
	}

	if err != nil {
		return nil, err
	}

	frameSize := 4 + len(header) + len(command.Body)
	buf := bytes.NewBuffer(make([]byte, frameSize))
	buf.Reset()

	err = binary.Write(buf, binary.BigEndian, int32(frameSize))
	if err != nil {
		return nil, err
	}

	err = binary.Write(buf, binary.BigEndian, markProtocolType(int32(len(header))))
	if err != nil {
		return nil, err
	}

	err = binary.Write(buf, binary.BigEndian, header)
	if err != nil {
		return nil, err
	}

	err = binary.Write(buf, binary.BigEndian, command.Body)
	if err != nil {
		return nil, err
	}

	return buf.Bytes(), nil
}

func decode(data []byte) (*RemotingCommand, error) {
	buf := bytes.NewBuffer(data)
	length := int32(len(data))
	var oriHeaderLen int32
	err := binary.Read(buf, binary.BigEndian, &oriHeaderLen)
	if err != nil {
		return nil, err
	}

	headerLength := oriHeaderLen & 0xFFFFFF
	headerData := make([]byte, headerLength)
	err = binary.Read(buf, binary.BigEndian, &headerData)
	if err != nil {
		return nil, err
	}

	var command *RemotingCommand
	switch codeType := byte((oriHeaderLen >> 24) & 0xFF); codeType {
	case JsonCodecs:
		command, err = jsonSerializer.decodeHeader(headerData)
	case RocketMQCodecs:
		command, err = rocketMqSerializer.decodeHeader(headerData)
	default:
		err = fmt.Errorf("unknown codec type: %d", codeType)
	}
	if err != nil {
		return nil, err
	}

	bodyLength := length - 4 - headerLength
	if bodyLength > 0 {
		bodyData := make([]byte, bodyLength)
		err = binary.Read(buf, binary.BigEndian, &bodyData)
		if err != nil {
			return nil, err
		}
		command.Body = bodyData
	}
	return command, nil
}

func markProtocolType(source int32) []byte {
	result := make([]byte, 4)
	result[0] = codecType
	result[1] = byte((source >> 16) & 0xFF)
	result[2] = byte((source >> 8) & 0xFF)
	result[3] = byte(source & 0xFF)
	return result
}

const (
	JsonCodecs     = byte(0)
	RocketMQCodecs = byte(1)
)

type serializer interface {
	encodeHeader(command *RemotingCommand) ([]byte, error)
	decodeHeader(data []byte) (*RemotingCommand, error)
}

// jsonCodec please refer to remoting/protocol/RemotingSerializable
type jsonCodec struct{}

func (c *jsonCodec) encodeHeader(command *RemotingCommand) ([]byte, error) {
	buf, err := jsoniter.Marshal(command)
	if err != nil {
		return nil, err
	}
	return buf, nil
}

func (c *jsonCodec) decodeHeader(header []byte) (*RemotingCommand, error) {
	command := &RemotingCommand{}
	command.ExtFields = make(map[string]string)
	command.Body = make([]byte, 0)
	err := jsoniter.Unmarshal(header, command)
	if err != nil {
		return nil, err
	}
	return command, nil
}

// rmqCodec implementation of RocketMQCodecs private protocol, please refer to remoting/protocol/RocketMQSerializable
// RocketMQCodecs Private Protocol Header format:
//
// v_flag: version flag
// r_len: length of remark body
// r_body: data of remark body
// e_len: length of extends fields body
// e_body: data of extends fields
//
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// + item | request_code | l_flag | v_flag | opaque | request_flag |  r_len  |   r_body    |  e_len  |    e_body   +
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// + len  |    2bytes    |  1byte | 2bytes | 4bytes |    4 bytes   | 4 bytes | r_len bytes | 4 bytes | e_len bytes +
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
const (
	// header + body length
	headerFixedLength = 21
)

type rmqCodec struct{}

// encodeHeader
func (c *rmqCodec) encodeHeader(command *RemotingCommand) ([]byte, error) {
	extBytes, err := c.encodeMaps(command.ExtFields)
	if err != nil {
		return nil, err
	}

	buf := bytes.NewBuffer(make([]byte, headerFixedLength+len(command.Remark)+len(extBytes)))
	buf.Reset()

	// request code, length is 2 bytes
	err = binary.Write(buf, binary.BigEndian, int16(command.Code))
	if err != nil {
		return nil, err
	}

	// language flag, length is 1 byte
	err = binary.Write(buf, binary.BigEndian, _Go)
	if err != nil {
		return nil, err
	}

	// version flag, length is 2 bytes
	err = binary.Write(buf, binary.BigEndian, int16(command.Version))
	if err != nil {
		return nil, err
	}

	// opaque flag, opaque is request identifier, length is 4 bytes
	err = binary.Write(buf, binary.BigEndian, command.Opaque)
	if err != nil {
		return nil, err
	}

	// request flag, length is 4 bytes
	err = binary.Write(buf, binary.BigEndian, command.Flag)
	if err != nil {
		return nil, err
	}

	// remark length flag, length is 4 bytes
	err = binary.Write(buf, binary.BigEndian, int32(len(command.Remark)))
	if err != nil {
		return nil, err
	}

	// write remark, len(command.Remark) bytes
	if len(command.Remark) > 0 {
		err = binary.Write(buf, binary.BigEndian, []byte(command.Remark))
		if err != nil {
			return nil, err
		}
	}

	err = binary.Write(buf, binary.BigEndian, int32(len(extBytes)))
	if err != nil {
		return nil, err
	}

	if len(extBytes) > 0 {
		err = binary.Write(buf, binary.BigEndian, extBytes)
		if err != nil {
			return nil, err
		}
	}

	return buf.Bytes(), nil
}

func (c *rmqCodec) encodeMaps(maps map[string]string) ([]byte, error) {
	if maps == nil || len(maps) == 0 {
		return []byte{}, nil
	}
	extFieldsBuf := bytes.NewBuffer([]byte{})
	var err error
	for key, value := range maps {
		err = binary.Write(extFieldsBuf, binary.BigEndian, int16(len(key)))
		if err != nil {
			return nil, err
		}
		err = binary.Write(extFieldsBuf, binary.BigEndian, []byte(key))
		if err != nil {
			return nil, err
		}

		err = binary.Write(extFieldsBuf, binary.BigEndian, int32(len(value)))
		if err != nil {
			return nil, err
		}
		err = binary.Write(extFieldsBuf, binary.BigEndian, []byte(value))
		if err != nil {
			return nil, err
		}
	}
	return extFieldsBuf.Bytes(), nil
}

func (c *rmqCodec) decodeHeader(data []byte) (*RemotingCommand, error) {
	var err error
	command := &RemotingCommand{}
	buf := bytes.NewBuffer(data)
	var code int16
	err = binary.Read(buf, binary.BigEndian, &code)
	if err != nil {
		return nil, err
	}
	command.Code = code

	var (
		languageCode byte
		remarkLen    int32
		extFieldsLen int32
	)
	err = binary.Read(buf, binary.BigEndian, &languageCode)
	if err != nil {
		return nil, err
	}
	command.Language = LanguageCode(languageCode)

	var version int16
	err = binary.Read(buf, binary.BigEndian, &version)
	if err != nil {
		return nil, err
	}
	command.Version = version

	// int opaque
	err = binary.Read(buf, binary.BigEndian, &command.Opaque)
	if err != nil {
		return nil, err
	}

	// int flag
	err = binary.Read(buf, binary.BigEndian, &command.Flag)
	if err != nil {
		return nil, err
	}

	// String remark
	err = binary.Read(buf, binary.BigEndian, &remarkLen)
	if err != nil {
		return nil, err
	}

	if remarkLen > 0 {
		var remarkData = make([]byte, remarkLen)
		err = binary.Read(buf, binary.BigEndian, &remarkData)
		if err != nil {
			return nil, err
		}
		command.Remark = string(remarkData)
	}

	err = binary.Read(buf, binary.BigEndian, &extFieldsLen)
	if err != nil {
		return nil, err
	}

	if extFieldsLen > 0 {
		extFieldsData := make([]byte, extFieldsLen)
		err = binary.Read(buf, binary.BigEndian, &extFieldsData)
		if err != nil {
			return nil, err
		}

		command.ExtFields = make(map[string]string)
		buf := bytes.NewBuffer(extFieldsData)
		var (
			kLen int16
			vLen int32
		)
		for buf.Len() > 0 {
			err = binary.Read(buf, binary.BigEndian, &kLen)
			if err != nil {
				return nil, err
			}

			key, err := getExtFieldsData(buf, int32(kLen))
			if err != nil {
				return nil, err
			}

			err = binary.Read(buf, binary.BigEndian, &vLen)
			if err != nil {
				return nil, err
			}

			value, err := getExtFieldsData(buf, vLen)
			if err != nil {
				return nil, err
			}
			command.ExtFields[key] = value
		}
	}

	return command, nil
}

func getExtFieldsData(buff *bytes.Buffer, length int32) (string, error) {
	var data = make([]byte, length)
	err := binary.Read(buff, binary.BigEndian, &data)
	if err != nil {
		return "", err
	}

	return string(data), nil
}
