blob: 8c3d8be3a362932c008ddd7187e32a9883d9f0ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dubbo
import (
"github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/utils"
)
//Constants for request and response attributes
const (
HeaderLength = 16
Magic = 0xdabb
MagicHigh = byte(0xda)
MagicLow = byte(0xbb)
FlagRequest = byte(0x80)
FlagTwoWay = byte(0x40)
FlagEvent = byte(0x20)
SerializationMask = byte(0x1f)
HeartBeatEvent = ""
)
//Constants for dubbo attributes
const (
DubboVersionKey string = "dubbo"
DubboVersion string = "2.0.0"
PathKey string = "path"
InterfaceKey string = "interface"
VersionKey string = "version"
CommaSeparator string = ","
FileSeparator string = "/"
SemicolonSeparator string = ";"
)
//Constants
const (
Success = 0
NeedMore = -1
InvalidFragement = -2
InvalidSerialization = -3
)
//serialise type
const (
Hessian2 = byte(2)
)
//DubboCodec is a struct
type DubboCodec struct {
}
//GetContentTypeID is a method which returns content type id
func (p *DubboCodec) GetContentTypeID() byte {
return Hessian2
}
//EncodeDubboRsp is a method which encodes dubbo response
func (p *DubboCodec) EncodeDubboRsp(rsp *DubboRsp, buffer *util.WriteBuffer) int {
// set Magic number.
header := make([]byte, HeaderLength)
// set Magic number.
util.Short2bytes(Magic, header, 0)
// set request and serialization flag.
header[2] = p.GetContentTypeID()
if rsp.IsHeartbeat() {
header[2] |= FlagEvent
}
// set response status.
status := rsp.GetStatus()
header[3] = status
// set request id.
util.Long2bytes(rsp.GetID(), header, 4)
buffer.WriteIndex(HeaderLength)
if status == Ok {
if rsp.IsHeartbeat() {
//encodeHeartbeatData
ret := rsp.GetValue()
buffer.WriteObject(ret)
} else {
//encodeResponseData
except := rsp.GetException()
if except == nil {
ret := rsp.GetValue()
if ret == nil {
buffer.WriteByte(ResponseNullValue)
} else {
//buffer.WriteByte(ResponseValue)
if v, ok := ret.([]byte); ok {
buffer.WriteBytes(v)
} else {
buffer.WriteObject(ret)
}
}
} else {
buffer.WriteByte(ResponseWithException)
buffer.WriteObject(except)
}
}
} else {
if rsp.GetErrorMsg() == "" {
buffer.WriteByte(ResponseNullValue)
} else {
buffer.WriteObject(rsp.GetErrorMsg())
}
}
len := buffer.WrittenBytes() - HeaderLength
util.Int2bytes(len, header, 12)
buffer.WriteIndex(0)
buffer.WriteBytes(header)
buffer.WriteIndex(HeaderLength + len)
return 0
}
//DecodeDubboRsqHead is a method which decodes dubbo response header
func (p *DubboCodec) DecodeDubboRsqHead(rsp *DubboRsp, header []byte, bodyLen *int) int {
if header[0] != MagicHigh || header[1] != MagicLow {
return InvalidFragement
}
//读取请求ID
var id int64 = util.Bytes2long(header, 4)
rsp.SetID(id)
var flag = header[2]
if (flag & FlagEvent) != 0 {
rsp.SetEvent(true)
}
proto := byte(flag & SerializationMask)
if proto != Hessian2 { //当前只支持hessian2编码
return InvalidSerialization
}
status := header[3]
rsp.SetStatus(status)
//读取长度
*bodyLen = int(util.Bytes2int(header, 12))
return Success
}
//DecodeDubboRspBody is a method which decodes dubbo response body
func (p *DubboCodec) DecodeDubboRspBody(buffer *util.ReadBuffer, rsp *DubboRsp) int {
var obj interface{}
var err error
if rsp.IsHeartbeat() {
rsp.SetValue(HeartBeatEvent)
}
//获取状态
if rsp.GetStatus() == Ok {
if rsp.IsHeartbeat() && (HeartBeatEvent == rsp.GetValue()) {
//decodeHeartbeatData
obj, err = buffer.ReadObject()
if err != nil {
rsp.SetStatus(ServerError)
rsp.SetErrorMsg(err.Error())
return 0
}
} else if rsp.mEvent {
//decodeEventData
obj, err = buffer.ReadObject()
if err != nil {
rsp.SetStatus(ServerError)
rsp.SetErrorMsg(err.Error())
return 0
}
} else {
//decodeResult
var valueType byte = buffer.ReadByte()
switch valueType {
case ResponseNullValue:
//do nothing
rsp.SetValue(nil)
return 0
case ResponseValue:
obj, err = buffer.ReadObject()
if err != nil {
rsp.SetStatus(ServerError)
rsp.SetErrorMsg(err.Error())
return -1
}
case ResponseWithException:
//readObject,设置异常
rsp.SetStatus(ServiceError)
obj, err = buffer.ReadObject()
if err != nil {
rsp.SetStatus(ServerError)
rsp.SetErrorMsg(err.Error())
return 0
}
}
}
rsp.SetValue(buffer.GetBuf())
//rsp.SetValue(obj)
} else {
obj, err = buffer.ReadObject()
if err != nil {
rsp.SetErrorMsg(err.Error())
} else {
if s, ok := obj.(string); !ok {
rsp.SetErrorMsg("unknown error")
} else {
rsp.SetErrorMsg(s)
}
}
}
return 0
}
//EncodeDubboReq is a method which encodes dubbo request
func (p *DubboCodec) EncodeDubboReq(req *Request, buffer *util.WriteBuffer) int {
// set Magic number.
header := make([]byte, HeaderLength)
util.Short2bytes(Magic, header, 0)
// set request and serialization flag.
header[2] = (byte)(FlagRequest | p.GetContentTypeID())
if req.IsHeartbeat() {
header[2] |= FlagEvent
}
if req.IsEvent() {
header[2] |= FlagEvent
}
if req.IsTwoWay() {
header[2] |= FlagTwoWay
}
status := req.GetStatus()
header[3] = status
// set request id.
util.Long2bytes(req.GetMsgID(), header, 4)
if buffer.WriteIndex(HeaderLength) != nil {
return -1
}
if v, ok := req.GetData().([]byte); ok {
buffer.WriteBytes(v)
}
buffer.WriteObject(req.GetAttachments())
len := buffer.WrittenBytes() - HeaderLength
util.Int2bytes(len, header, 12)
buffer.WriteIndex(0)
buffer.WriteBytes(header)
buffer.WriteIndex(HeaderLength + len)
return 0
}
//DecodeDubboReqBodyForRegstry is a method which decodes dubbo request body from registry
func (p *DubboCodec) DecodeDubboReqBodyForRegstry(req *Request, bodyBuf *util.ReadBuffer) int {
var obj interface{}
var err error
if req.IsHeartbeat() {
//decodeHeartbeatData
obj, err = bodyBuf.ReadObject()
if err != nil {
req.SetData(err.Error())
req.SetBroken(true)
return -1
}
} else if req.IsEvent() {
//decodeEventData
obj, err = bodyBuf.ReadObject()
if err != nil {
req.SetData(err.Error())
req.SetBroken(true)
return -1
}
} else {
req.SetAttachment(DubboVersionKey, bodyBuf.ReadString())
req.SetAttachment(PathKey, bodyBuf.ReadString())
req.SetAttachment(VersionKey, bodyBuf.ReadString())
req.SetVersion(req.GetAttachment(VersionKey, ""))
req.SetMethodName(bodyBuf.ReadString())
//解析参数
typeDesc := string(bodyBuf.ReadString())
agrsArry := util.TypeDesToArgsObjArry(typeDesc)
if typeDesc == "" {
agrsArry = nil
} else {
size := len(agrsArry)
if req.GetMethodName() == "subscribe" {
size = 1
}
for i := 0; i < size; i++ {
val, err := bodyBuf.ReadObject()
if err != nil {
req.SetBroken(true)
req.SetData(err.Error())
return -1
} else {
agrsArry[i].SetValue(val)
}
}
req.SetArguments(agrsArry)
}
if err == nil {
req.SetAttachments(nil)
} else {
req.SetBroken(true)
req.SetData(err.Error())
return -1
}
req.SetBroken(false)
req.SetData(obj)
}
return 0
}
//DecodeDubboReqBody is a method which decodes dobbo request body
func (p *DubboCodec) DecodeDubboReqBody(req *Request, bodyBuf *util.ReadBuffer) int {
var obj interface{}
var err error
if req.IsHeartbeat() {
//decodeHeartbeatData
obj, err = bodyBuf.ReadObject()
if err != nil {
req.SetData(err.Error())
req.SetBroken(true)
return -1
}
} else if req.IsEvent() {
//decodeEventData
obj, err = bodyBuf.ReadObject()
if err != nil {
req.SetData(err.Error())
req.SetBroken(true)
return -1
}
} else {
req.SetAttachment(DubboVersionKey, bodyBuf.ReadString())
req.SetAttachment(PathKey, bodyBuf.ReadString())
req.SetAttachment(VersionKey, bodyBuf.ReadString())
req.SetVersion(req.GetAttachment(VersionKey, ""))
req.SetMethodName(bodyBuf.ReadString())
obj = bodyBuf.GetBuf()
req.SetBroken(false)
req.SetData(obj)
}
return 0
}
//DecodeDubboReqHead is a method which decodes dubbo request header
func (p *DubboCodec) DecodeDubboReqHead(req *Request, header []byte, bodyLen *int) int {
if len(header) < HeaderLength {
return NeedMore
}
//读取Magic
if header[0] != MagicHigh || header[1] != MagicLow {
return InvalidFragement
}
//读取请求ID
var id = util.Bytes2long(header, 4)
var flag = header[2]
proto := byte(flag & SerializationMask)
if proto != Hessian2 { //当前只支持hessian2编码
return InvalidSerialization
}
if (flag & FlagRequest) == 0 {
return InvalidFragement
}
req.SetMsgID(id)
req.SetVersion(DubboVersion)
req.SetTwoWay((flag & FlagTwoWay) != 0)
if (flag & FlagEvent) != 0 {
req.SetEvent(HeartBeatEvent)
}
//读取长度
*bodyLen = int(util.Bytes2int(header, 12))
return Success
}