blob: f756c11f34da24f26d34323ece51cf87625a6da2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package remote
import (
"bytes"
"encoding/binary"
"fmt"
"sync/atomic"
jsoniter "github.com/json-iterator/go"
)
var opaque int32
const (
// 0, REQUEST_COMMAND
RPCType = 0
// 1, RPC
RPCOneWay = 1
//ResponseType for response
ResponseType = 1
_Flag = 0
_Version = 317
)
type LanguageCode byte
const (
_Java = LanguageCode(0)
_Go = LanguageCode(9)
_Unknown = LanguageCode(127)
)
func (lc LanguageCode) MarshalJSON() ([]byte, error) {
return []byte(`"GO"`), nil
}
func (lc *LanguageCode) UnmarshalJSON(b []byte) error {
switch string(b) {
case "JAVA":
*lc = _Java
case "GO", `"GO"`:
*lc = _Go
default:
*lc = _Unknown
}
return nil
}
func (lc LanguageCode) String() string {
switch lc {
case _Java:
return "JAVA"
case _Go:
return "GO"
default:
return "unknown"
}
}
type RemotingCommand struct {
Code int16 `json:"code"`
Language LanguageCode `json:"language"`
Version int16 `json:"version"`
Opaque int32 `json:"opaque"`
Flag int32 `json:"flag"`
Remark string `json:"remark"`
ExtFields map[string]string `json:"extFields"`
Body []byte `json:"-"`
}
type CustomHeader interface {
Encode() map[string]string
}
func NewRemotingCommand(code int16, header CustomHeader, body []byte) *RemotingCommand {
cmd := &RemotingCommand{
Code: code,
Version: _Version,
Opaque: atomic.AddInt32(&opaque, 1),
Body: body,
Language: _Go,
ExtFields: make(map[string]string),
}
if header != nil {
cmd.ExtFields = header.Encode()
}
return cmd
}
func (command *RemotingCommand) String() string {
return fmt.Sprintf("Code: %d, opaque: %d, Remark: %s, ExtFields: %v",
command.Code, command.Opaque, command.Remark, command.ExtFields)
}
func (command *RemotingCommand) isResponseType() bool {
return command.Flag&(ResponseType) == ResponseType
}
func (command *RemotingCommand) markResponseType() {
command.Flag = command.Flag | ResponseType
}
var (
jsonSerializer = &jsonCodec{}
rocketMqSerializer = &rmqCodec{}
codecType byte
)
// encode RemotingCommand
//
// Frame format:
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// + item | frame_size | header_length | header_body | body +
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// + len | 4bytes | 4bytes | (21 + r_len + e_len) bytes | remain bytes +
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
func encode(command *RemotingCommand) ([]byte, error) {
var (
header []byte
err error
)
switch codecType {
case JsonCodecs:
header, err = jsonSerializer.encodeHeader(command)
case RocketMQCodecs:
header, err = rocketMqSerializer.encodeHeader(command)
}
if err != nil {
return nil, err
}
frameSize := 4 + len(header) + len(command.Body)
buf := bytes.NewBuffer(make([]byte, frameSize))
buf.Reset()
err = binary.Write(buf, binary.BigEndian, int32(frameSize))
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.BigEndian, markProtocolType(int32(len(header))))
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.BigEndian, header)
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.BigEndian, command.Body)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func decode(data []byte) (*RemotingCommand, error) {
buf := bytes.NewBuffer(data)
length := int32(len(data))
var oriHeaderLen int32
err := binary.Read(buf, binary.BigEndian, &oriHeaderLen)
if err != nil {
return nil, err
}
headerLength := oriHeaderLen & 0xFFFFFF
headerData := make([]byte, headerLength)
err = binary.Read(buf, binary.BigEndian, &headerData)
if err != nil {
return nil, err
}
var command *RemotingCommand
switch codeType := byte((oriHeaderLen >> 24) & 0xFF); codeType {
case JsonCodecs:
command, err = jsonSerializer.decodeHeader(headerData)
case RocketMQCodecs:
command, err = rocketMqSerializer.decodeHeader(headerData)
default:
err = fmt.Errorf("unknown codec type: %d", codeType)
}
if err != nil {
return nil, err
}
bodyLength := length - 4 - headerLength
if bodyLength > 0 {
bodyData := make([]byte, bodyLength)
err = binary.Read(buf, binary.BigEndian, &bodyData)
if err != nil {
return nil, err
}
command.Body = bodyData
}
return command, nil
}
func markProtocolType(source int32) []byte {
result := make([]byte, 4)
result[0] = codecType
result[1] = byte((source >> 16) & 0xFF)
result[2] = byte((source >> 8) & 0xFF)
result[3] = byte(source & 0xFF)
return result
}
const (
JsonCodecs = byte(0)
RocketMQCodecs = byte(1)
)
type serializer interface {
encodeHeader(command *RemotingCommand) ([]byte, error)
decodeHeader(data []byte) (*RemotingCommand, error)
}
// jsonCodec please refer to remoting/protocol/RemotingSerializable
type jsonCodec struct{}
func (c *jsonCodec) encodeHeader(command *RemotingCommand) ([]byte, error) {
buf, err := jsoniter.Marshal(command)
if err != nil {
return nil, err
}
return buf, nil
}
func (c *jsonCodec) decodeHeader(header []byte) (*RemotingCommand, error) {
command := &RemotingCommand{}
command.ExtFields = make(map[string]string)
command.Body = make([]byte, 0)
err := jsoniter.Unmarshal(header, command)
if err != nil {
return nil, err
}
return command, nil
}
// rmqCodec implementation of RocketMQCodecs private protocol, please refer to remoting/protocol/RocketMQSerializable
// RocketMQCodecs Private Protocol Header format:
//
// v_flag: version flag
// r_len: length of remark body
// r_body: data of remark body
// e_len: length of extends fields body
// e_body: data of extends fields
//
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// + item | request_code | l_flag | v_flag | opaque | request_flag | r_len | r_body | e_len | e_body +
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// + len | 2bytes | 1byte | 2bytes | 4bytes | 4 bytes | 4 bytes | r_len bytes | 4 bytes | e_len bytes +
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
const (
// header + body length
headerFixedLength = 21
)
type rmqCodec struct{}
// encodeHeader
func (c *rmqCodec) encodeHeader(command *RemotingCommand) ([]byte, error) {
extBytes, err := c.encodeMaps(command.ExtFields)
if err != nil {
return nil, err
}
buf := bytes.NewBuffer(make([]byte, headerFixedLength+len(command.Remark)+len(extBytes)))
buf.Reset()
// request code, length is 2 bytes
err = binary.Write(buf, binary.BigEndian, int16(command.Code))
if err != nil {
return nil, err
}
// language flag, length is 1 byte
err = binary.Write(buf, binary.BigEndian, _Go)
if err != nil {
return nil, err
}
// version flag, length is 2 bytes
err = binary.Write(buf, binary.BigEndian, int16(command.Version))
if err != nil {
return nil, err
}
// opaque flag, opaque is request identifier, length is 4 bytes
err = binary.Write(buf, binary.BigEndian, command.Opaque)
if err != nil {
return nil, err
}
// request flag, length is 4 bytes
err = binary.Write(buf, binary.BigEndian, command.Flag)
if err != nil {
return nil, err
}
// remark length flag, length is 4 bytes
err = binary.Write(buf, binary.BigEndian, int32(len(command.Remark)))
if err != nil {
return nil, err
}
// write remark, len(command.Remark) bytes
if len(command.Remark) > 0 {
err = binary.Write(buf, binary.BigEndian, []byte(command.Remark))
if err != nil {
return nil, err
}
}
err = binary.Write(buf, binary.BigEndian, int32(len(extBytes)))
if err != nil {
return nil, err
}
if len(extBytes) > 0 {
err = binary.Write(buf, binary.BigEndian, extBytes)
if err != nil {
return nil, err
}
}
return buf.Bytes(), nil
}
func (c *rmqCodec) encodeMaps(maps map[string]string) ([]byte, error) {
if maps == nil || len(maps) == 0 {
return []byte{}, nil
}
extFieldsBuf := bytes.NewBuffer([]byte{})
var err error
for key, value := range maps {
err = binary.Write(extFieldsBuf, binary.BigEndian, int16(len(key)))
if err != nil {
return nil, err
}
err = binary.Write(extFieldsBuf, binary.BigEndian, []byte(key))
if err != nil {
return nil, err
}
err = binary.Write(extFieldsBuf, binary.BigEndian, int32(len(value)))
if err != nil {
return nil, err
}
err = binary.Write(extFieldsBuf, binary.BigEndian, []byte(value))
if err != nil {
return nil, err
}
}
return extFieldsBuf.Bytes(), nil
}
func (c *rmqCodec) decodeHeader(data []byte) (*RemotingCommand, error) {
var err error
command := &RemotingCommand{}
buf := bytes.NewBuffer(data)
var code int16
err = binary.Read(buf, binary.BigEndian, &code)
if err != nil {
return nil, err
}
command.Code = code
var (
languageCode byte
remarkLen int32
extFieldsLen int32
)
err = binary.Read(buf, binary.BigEndian, &languageCode)
if err != nil {
return nil, err
}
command.Language = LanguageCode(languageCode)
var version int16
err = binary.Read(buf, binary.BigEndian, &version)
if err != nil {
return nil, err
}
command.Version = version
// int opaque
err = binary.Read(buf, binary.BigEndian, &command.Opaque)
if err != nil {
return nil, err
}
// int flag
err = binary.Read(buf, binary.BigEndian, &command.Flag)
if err != nil {
return nil, err
}
// String remark
err = binary.Read(buf, binary.BigEndian, &remarkLen)
if err != nil {
return nil, err
}
if remarkLen > 0 {
var remarkData = make([]byte, remarkLen)
err = binary.Read(buf, binary.BigEndian, &remarkData)
if err != nil {
return nil, err
}
command.Remark = string(remarkData)
}
err = binary.Read(buf, binary.BigEndian, &extFieldsLen)
if err != nil {
return nil, err
}
if extFieldsLen > 0 {
extFieldsData := make([]byte, extFieldsLen)
err = binary.Read(buf, binary.BigEndian, &extFieldsData)
if err != nil {
return nil, err
}
command.ExtFields = make(map[string]string)
buf := bytes.NewBuffer(extFieldsData)
var (
kLen int16
vLen int32
)
for buf.Len() > 0 {
err = binary.Read(buf, binary.BigEndian, &kLen)
if err != nil {
return nil, err
}
key, err := getExtFieldsData(buf, int32(kLen))
if err != nil {
return nil, err
}
err = binary.Read(buf, binary.BigEndian, &vLen)
if err != nil {
return nil, err
}
value, err := getExtFieldsData(buf, vLen)
if err != nil {
return nil, err
}
command.ExtFields[key] = value
}
}
return command, nil
}
func getExtFieldsData(buff *bytes.Buffer, length int32) (string, error) {
var data = make([]byte, length)
err := binary.Read(buff, binary.BigEndian, &data)
if err != nil {
return "", err
}
return string(data), nil
}