blob: e5acff50515515beb778f1cfaeceeb76b70c6e83 [file] [log] [blame]
package server
import (
"bytes"
"github.com/dubbogo/getty"
"github.com/pkg/errors"
"github.com/dk-lockdown/seata-golang/protocal"
"github.com/dk-lockdown/seata-golang/protocal/codec"
"vimagination.zapto.org/byteio"
)
/**
* <pre>
* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
* +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
* | magic |Proto| Full length | Head | Msg |Seria|Compr| RequestId |
* | code |colVer| (head+body) | Length |Type |lizer|ess | |
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
* | |
* | Head Map [Optional] |
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
* | |
* | body |
* | |
* | ... ... |
* +-----------------------------------------------------------------------------------------------+
* </pre>
* <p>
* <li>Full Length: include all data </li>
* <li>Head Length: include head data from magic code to head map. </li>
* <li>Body Length: Full Length - Head Length</li>
* </p>
* https://github.com/seata/seata/issues/893
*/
var (
RpcServerPkgHandler = &RpcServerPackageHandler{}
)
type RpcServerPackageHandler struct{}
func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
r := byteio.BigEndianReader{Reader:bytes.NewReader(data)}
b0,_ := r.ReadByte()
b1,_ := r.ReadByte()
if b0 != protocal.MAGIC_CODE_BYTES[0] || b1 != protocal.MAGIC_CODE_BYTES[1] {
return nil,0,errors.Errorf("Unknown magic code: %b,%b",b0,b1)
}
r.ReadByte()
// TODO check version compatible here
fullLength,_,_ := r.ReadInt32()
headLength,_,_ := r.ReadInt16()
messageType,_ := r.ReadByte()
codecType,_ := r.ReadByte()
compressorType,_ := r.ReadByte()
requestId,_,_ := r.ReadInt32()
rpcMessage := protocal.RpcMessage{
Codec:codecType,
Id:requestId,
Compressor:compressorType,
MessageType:messageType,
}
headMapLength := headLength - protocal.V1_HEAD_LENGTH
if headMapLength > 0 {
rpcMessage.HeadMap = headMapDecode(data[protocal.V1_HEAD_LENGTH+1:headMapLength])
}
if messageType == protocal.MSGTYPE_HEARTBEAT_REQUEST {
rpcMessage.Body = protocal.HeartBeatMessagePing
} else if messageType == protocal.MSGTYPE_HEARTBEAT_RESPONSE {
rpcMessage.Body = protocal.HeartBeatMessagePong
} else {
bodyLength := fullLength - int32(headLength)
if bodyLength > 0 {
//todo compress
msg,_ := codec.MessageDecoder(codecType,data[headLength:])
rpcMessage.Body = msg
}
}
return rpcMessage, int(fullLength), nil
}
func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
var result = make([]byte,0)
msg := pkg.(protocal.RpcMessage)
fullLength := protocal.V1_HEAD_LENGTH
headLength := protocal.V1_HEAD_LENGTH
var b bytes.Buffer
w := byteio.BigEndianWriter{Writer: &b}
result = append(result, protocal.MAGIC_CODE_BYTES[:2]...)
result = append(result, protocal.VERSION)
w.WriteByte(msg.MessageType)
w.WriteByte(msg.Codec)
w.WriteByte(msg.Compressor)
w.WriteInt32(msg.Id)
if msg.HeadMap != nil && len(msg.HeadMap) > 0 {
headMapBytes,headMapLength := headMapEncode(msg.HeadMap)
headLength += headMapLength
fullLength += headMapLength
w.Write(headMapBytes)
}
if msg.MessageType != protocal.MSGTYPE_HEARTBEAT_REQUEST &&
msg.MessageType != protocal.MSGTYPE_HEARTBEAT_RESPONSE {
bodyBytes := codec.MessageEncoder(msg.Codec,msg.Body)
fullLength += len(bodyBytes)
w.Write(bodyBytes)
}
fullLen := int32(fullLength)
headLen := int16(headLength)
result = append(result, []byte{ byte(fullLen>>26),byte(fullLen>>16),byte(fullLen>>8),byte(fullLen) }...)
result = append(result, []byte{ byte(headLen>>8),byte(headLen) }...)
result = append(result,b.Bytes()...)
return result, nil
}
func headMapDecode(data []byte) map[string]string {
mp := make(map[string]string)
size := len(data)
if size == 0 {
return mp
}
r := byteio.BigEndianReader{Reader:bytes.NewReader(data)}
readLength := 0
for {
if readLength >= size { break }
var key, value string
lengthK,_,_ := r.ReadUint16()
if lengthK < 0 {
break
} else if lengthK == 0 {
key = ""
} else {
key,_,_ = r.ReadString(int(lengthK))
}
lengthV,_,_ := r.ReadUint16()
if lengthV < 0 {
break
} else if lengthV == 0 {
value = ""
} else {
value,_,_ = r.ReadString(int(lengthV))
}
mp[key] = value
readLength += int(lengthK + lengthV)
}
return mp
}
func headMapEncode(data map[string]string) ([]byte,int) {
var b bytes.Buffer
w := byteio.BigEndianWriter{Writer: &b}
for k,v := range data{
if k == "" {
w.WriteUint16(0)
} else {
w.WriteUint16(uint16(len(k)))
w.WriteString(k)
}
if v == "" {
w.WriteUint16(0)
} else {
w.WriteUint16(uint16(len(v)))
w.WriteString(v)
}
}
return b.Bytes(),b.Len()
}