blob: c139f3547b2af5196a38be9bc83f49251bfcf043 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package impl
import (
"bufio"
"encoding/binary"
)
import (
hessian "github.com/apache/dubbo-go-hessian2"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/remoting"
)
type ProtocolCodec struct {
reader *bufio.Reader
pkgType PackageType
bodyLen int
serializer Serializer
headerRead bool
}
func (c *ProtocolCodec) ReadHeader(header *DubboHeader) error {
var err error
if c.reader.Size() < HEADER_LENGTH {
return hessian.ErrHeaderNotEnough
}
buf, err := c.reader.Peek(HEADER_LENGTH)
if err != nil { // this is impossible
return perrors.WithStack(err)
}
_, err = c.reader.Discard(HEADER_LENGTH)
if err != nil { // this is impossible
return perrors.WithStack(err)
}
//// read header
if buf[0] != MAGIC_HIGH && buf[1] != MAGIC_LOW {
return hessian.ErrIllegalPackage
}
// Header{serialization id(5 bit), event, two way, req/response}
if header.SerialID = buf[2] & SERIAL_MASK; header.SerialID == Zero {
return perrors.Errorf("serialization ID:%v", header.SerialID)
}
flag := buf[2] & FLAG_EVENT
if flag != Zero {
header.Type |= PackageHeartbeat
}
flag = buf[2] & FLAG_REQUEST
if flag != Zero {
header.Type |= PackageRequest
flag = buf[2] & FLAG_TWOWAY
if flag != Zero {
header.Type |= PackageRequest_TwoWay
}
} else {
header.Type |= PackageResponse
header.ResponseStatus = buf[3]
if header.ResponseStatus != Response_OK {
header.Type |= PackageResponse_Exception
}
}
// Header{req id}
header.ID = int64(binary.BigEndian.Uint64(buf[4:]))
// Header{body len}
header.BodyLen = int(binary.BigEndian.Uint32(buf[12:]))
if header.BodyLen < 0 {
return hessian.ErrIllegalPackage
}
c.pkgType = header.Type
c.bodyLen = header.BodyLen
if c.reader.Buffered() < c.bodyLen {
return hessian.ErrBodyNotEnough
}
c.headerRead = true
return perrors.WithStack(err)
}
func (c *ProtocolCodec) EncodeHeader(p DubboPackage) []byte {
header := p.Header
bs := make([]byte, 0)
switch header.Type {
case PackageHeartbeat:
if header.ResponseStatus == Zero {
bs = append(bs, hessian.DubboRequestHeartbeatHeader[:]...)
} else {
bs = append(bs, hessian.DubboResponseHeartbeatHeader[:]...)
}
case PackageResponse:
bs = append(bs, hessian.DubboResponseHeaderBytes[:]...)
if header.ResponseStatus != 0 {
bs[3] = header.ResponseStatus
}
case PackageRequest_TwoWay:
bs = append(bs, hessian.DubboRequestHeaderBytesTwoWay[:]...)
}
bs[2] |= header.SerialID & hessian.SERIAL_MASK
binary.BigEndian.PutUint64(bs[4:], uint64(header.ID))
return bs
}
func (c *ProtocolCodec) Encode(p DubboPackage) ([]byte, error) {
// header
if c.serializer == nil {
return nil, perrors.New("serializer should not be nil")
}
header := p.Header
switch header.Type {
case PackageHeartbeat:
if header.ResponseStatus == Zero {
return packRequest(p, c.serializer)
}
return packResponse(p, c.serializer)
case PackageRequest, PackageRequest_TwoWay:
return packRequest(p, c.serializer)
case PackageResponse:
return packResponse(p, c.serializer)
default:
return nil, perrors.Errorf("Unrecognised message type: %v", header.Type)
}
}
func (c *ProtocolCodec) Decode(p *DubboPackage) error {
if !c.headerRead {
if err := c.ReadHeader(&p.Header); err != nil {
return err
}
}
body, err := c.reader.Peek(p.GetBodyLen())
if err != nil {
return err
}
if p.IsResponseWithException() {
logger.Infof("response with exception: %+v", p.Header)
decoder := hessian.NewDecoder(body)
exception, err := decoder.Decode()
if err != nil {
return perrors.WithStack(err)
}
rsp, ok := p.Body.(*ResponsePayload)
if !ok {
return perrors.Errorf("java exception:%s", exception.(string))
}
rsp.Exception = perrors.Errorf("java exception:%s", exception.(string))
return nil
} else if p.IsHeartBeat() {
// heartbeat no need to unmarshal contents
return nil
}
if c.serializer == nil {
return perrors.New("Codec serializer is nil")
}
if p.IsResponse() {
p.Body = &ResponsePayload{
RspObj: remoting.GetPendingResponse(remoting.SequenceType(p.Header.ID)).Reply,
}
}
return c.serializer.Unmarshal(body, p)
}
func (c *ProtocolCodec) SetSerializer(serializer Serializer) {
c.serializer = serializer
}
func packRequest(p DubboPackage, serializer Serializer) ([]byte, error) {
var (
byteArray []byte
pkgLen int
)
header := p.Header
//////////////////////////////////////////
// byteArray
//////////////////////////////////////////
// magic
switch header.Type {
case PackageHeartbeat:
byteArray = append(byteArray, DubboRequestHeartbeatHeader[:]...)
case PackageRequest_TwoWay:
byteArray = append(byteArray, DubboRequestHeaderBytesTwoWay[:]...)
default:
byteArray = append(byteArray, DubboRequestHeaderBytes[:]...)
}
// serialization id, two way flag, event, request/response flag
// SerialID is id of serialization approach in java dubbo
byteArray[2] |= header.SerialID & SERIAL_MASK
// request id
binary.BigEndian.PutUint64(byteArray[4:], uint64(header.ID))
//////////////////////////////////////////
// body
//////////////////////////////////////////
if p.IsHeartBeat() {
byteArray = append(byteArray, byte('N'))
pkgLen = 1
} else {
body, err := serializer.Marshal(p)
if err != nil {
return nil, err
}
pkgLen = len(body)
if pkgLen > int(DEFAULT_LEN) { // 8M
return nil, perrors.Errorf("Data length %d too large, max payload %d", pkgLen, DEFAULT_LEN)
}
byteArray = append(byteArray, body...)
}
binary.BigEndian.PutUint32(byteArray[12:], uint32(pkgLen))
return byteArray, nil
}
func packResponse(p DubboPackage, serializer Serializer) ([]byte, error) {
var (
byteArray []byte
)
header := p.Header
hb := p.IsHeartBeat()
// magic
if hb {
byteArray = append(byteArray, DubboResponseHeartbeatHeader[:]...)
} else {
byteArray = append(byteArray, DubboResponseHeaderBytes[:]...)
}
// set serialID, identify serialization types, eg: fastjson->6, hessian2->2
byteArray[2] |= header.SerialID & SERIAL_MASK
// response status
if header.ResponseStatus != 0 {
byteArray[3] = header.ResponseStatus
}
// request id
binary.BigEndian.PutUint64(byteArray[4:], uint64(header.ID))
// body
body, err := serializer.Marshal(p)
if err != nil {
return nil, err
}
pkgLen := len(body)
if pkgLen > int(DEFAULT_LEN) { // 8M
return nil, perrors.Errorf("Data length %d too large, max payload %d", pkgLen, DEFAULT_LEN)
}
// byteArray{body length}
binary.BigEndian.PutUint32(byteArray[12:], uint32(pkgLen))
byteArray = append(byteArray, body...)
return byteArray, nil
}
func NewDubboCodec(reader *bufio.Reader) *ProtocolCodec {
s, _ := GetSerializerById(constant.S_Hessian2)
return &ProtocolCodec{
reader: reader,
pkgType: 0,
bodyLen: 0,
headerRead: false,
serializer: s.(Serializer),
}
}