dubbo 协议修改(不再解析原始数据,直接透传) (#40)
* dubbo 协议修改(不再解析原始数据,直接透传)
* dubbo protocol modify
diff --git a/protocol/dubbo/client/chassis/dubbo_chassis_client.go b/protocol/dubbo/client/chassis/dubbo_chassis_client.go
index 5181cbb..44de38d 100644
--- a/protocol/dubbo/client/chassis/dubbo_chassis_client.go
+++ b/protocol/dubbo/client/chassis/dubbo_chassis_client.go
@@ -48,7 +48,6 @@
//NewDubboChassisClient create new client
func NewDubboChassisClient(options client.Options) (client.ProtocolClient, error) {
-
rc := &dubboChassisClient{
once: sync.Once{},
opts: options,
@@ -72,7 +71,7 @@
if endPoint == "" {
return &util.BaseError{" The endpoint is empty"}
}
- lager.Logger.Info("Dubbo invoke endPont: " + endPoint)
+
dubboCli, err := dubboClient.CachedClients.GetClient(endPoint)
if err != nil {
lager.Logger.Errorf("Invalid Request addr %s %s", endPoint, err)
diff --git a/protocol/dubbo/client/client_conn.go b/protocol/dubbo/client/client_conn.go
index 2d363fb..fe42b48 100644
--- a/protocol/dubbo/client/client_conn.go
+++ b/protocol/dubbo/client/client_conn.go
@@ -112,7 +112,6 @@
//通知处理应答消息
for {
//先处理消息头
-
buf := make([]byte, dubbo.HeaderLength)
size, err := this.conn.Read(buf)
if err != nil {
diff --git a/protocol/dubbo/dubbo/codec.go b/protocol/dubbo/dubbo/codec.go
index 26b0b89..b98f05c 100644
--- a/protocol/dubbo/dubbo/codec.go
+++ b/protocol/dubbo/dubbo/codec.go
@@ -98,8 +98,13 @@
if ret == nil {
buffer.WriteByte(ResponseNullValue)
} else {
- buffer.WriteByte(ResponseValue)
- buffer.WriteObject(ret)
+ //buffer.WriteByte(ResponseValue)
+ if v, ok := ret.([]byte); ok {
+ buffer.WriteBytes(v)
+ } else {
+ buffer.WriteObject(ret)
+ }
+
}
} else {
buffer.WriteByte(ResponseWithException)
@@ -127,7 +132,6 @@
//DecodeDubboRsqHead is a method which decodes dubbo response header
func (p *DubboCodec) DecodeDubboRsqHead(rsp *DubboRsp, header []byte, bodyLen *int) int {
-
if header[0] != MagicHigh || header[1] != MagicLow {
return InvalidFragement
}
@@ -155,7 +159,6 @@
func (p *DubboCodec) DecodeDubboRspBody(buffer *util.ReadBuffer, rsp *DubboRsp) int {
var obj interface{}
var err error
-
if rsp.IsHeartbeat() {
rsp.SetValue(HeartBeatEvent)
}
@@ -203,7 +206,8 @@
}
}
}
- rsp.SetValue(obj)
+ rsp.SetValue(buffer.GetBuf())
+ //rsp.SetValue(obj)
} else {
obj, err = buffer.ReadObject()
if err != nil {
@@ -245,30 +249,9 @@
return -1
}
- //写入dubbo version
- buffer.WriteObject(req.GetAttachment(DubboVersionKey, DubboVersion))
- //写入path key
- buffer.WriteObject(req.GetAttachment(PathKey, ""))
- //写入接口version key
- buffer.WriteObject(req.GetAttachment(VersionKey, "0.0.0"))
- //写入方法名称
- buffer.WriteObject(req.GetMethodName())
- //写入参数类型列表
- buffer.WriteObject(util.GetJavaDesc(req.GetArguments()))
- //写入参数列表
- var argObjs []util.Argument
- argObjs = req.GetArguments()
- var err error
- if argObjs != nil {
- size := len(argObjs)
- for i := 0; i < size; i++ {
- err = buffer.WriteObject(argObjs[i].GetValue())
- if err != nil {
- return -1
- }
- }
+ if v, ok := req.GetData().([]byte); ok {
+ buffer.WriteBytes(v)
}
- //写入attatchmanets
buffer.WriteObject(req.GetAttachments())
len := buffer.WrittenBytes() - HeaderLength
@@ -370,33 +353,8 @@
req.SetAttachment(VersionKey, bodyBuf.ReadString())
req.SetVersion(req.GetAttachment(VersionKey, ""))
req.SetMethodName(bodyBuf.ReadString())
- //解析参数
- typeDesc := string(bodyBuf.ReadString())
- agrsArry := util.TypeDesToArgsObjArry(typeDesc)
- if typeDesc == "" {
- agrsArry = nil
- } else {
- size := len(agrsArry)
- for i := 0; i < size; i++ {
- val, err := bodyBuf.ReadObject()
- if err != nil {
- req.SetBroken(true)
- req.SetData(err.Error())
- return -1
- } else {
- agrsArry[i].SetValue(val)
- }
- }
- req.SetArguments(agrsArry)
- }
- attatchments, err := bodyBuf.ReadMap()
- if err == nil {
- req.SetAttachments(attatchments)
- } else {
- req.SetBroken(true)
- req.SetData(err.Error())
- return -1
- }
+
+ obj = bodyBuf.GetBuf()
req.SetBroken(false)
req.SetData(obj)
}
diff --git a/protocol/dubbo/proxy/dubbo_proxy_ouput.go b/protocol/dubbo/proxy/dubbo_proxy_ouput.go
index 919bb5b..2b99c6f 100755
--- a/protocol/dubbo/proxy/dubbo_proxy_ouput.go
+++ b/protocol/dubbo/proxy/dubbo_proxy_ouput.go
@@ -21,11 +21,13 @@
"context"
"encoding/json"
"fmt"
+ "github.com/go-mesh/mesher/cmd"
"net/http"
"net/url"
"github.com/go-chassis/go-chassis/client/rest"
"github.com/go-chassis/go-chassis/core/common"
+ chassisCommon "github.com/go-chassis/go-chassis/core/common"
chassisconfig "github.com/go-chassis/go-chassis/core/config"
"github.com/go-chassis/go-chassis/core/handler"
"github.com/go-chassis/go-chassis/core/invocation"
@@ -36,7 +38,6 @@
"github.com/go-chassis/go-chassis/pkg/util/httputil"
"github.com/go-chassis/go-chassis/pkg/util/tags"
"github.com/go-chassis/go-chassis/third_party/forked/afex/hystrix-go/hystrix"
- "github.com/go-mesh/mesher/cmd"
mesherCommon "github.com/go-mesh/mesher/common"
mesherRuntime "github.com/go-mesh/mesher/pkg/runtime"
"github.com/go-mesh/mesher/protocol"
@@ -54,6 +55,9 @@
ProxyTag = "mesherproxy"
)
+//IsProvider is variable of type boolean used for tag proxyed dubbo service as provider(true) or consumer(false)
+var IsProvider bool
+
// DubboListenAddr is a variable of type string used for storing listen address
var DubboListenAddr string
@@ -195,7 +199,7 @@
inv.SourceServiceID = runtime.ServiceID
inv.SourceMicroService = ctx.Req.GetAttachment(common.HeaderSourceName, "")
inv.Args = ctx.Req
-
+ inv.Ctx = context.WithValue(context.Background(), chassisCommon.ContextHeaderKey{}, ctx.Req.GetAttachments())
inv.MicroServiceName = svc.ServiceName
inv.RouteTags = utiltags.NewDefaultTag(svc.Version, svc.AppID)
value := ctx.Req.GetAttachment(ProxyTag, "")
@@ -209,11 +213,10 @@
SetLocalServiceAddress(inv) //select local service
var err error
var c *handler.Chain
-
if inv.Protocol == "dubbo" {
//发送请求
- value := ctx.Req.GetAttachment(ProxyTag, "")
- if value == "" { //come from proxyedDubboSvc
+ //value := ctx.Req.GetAttachment(ProxyTag, "")
+ if !IsProvider { //come from proxyedDubboSvc
ctx.Req.SetAttachment(common.HeaderSourceName, chassisconfig.SelfServiceName)
ctx.Req.SetAttachment(ProxyTag, "true")
@@ -224,7 +227,6 @@
return err
}
}
-
c.Next(inv, func(ir *invocation.Response) error {
return handleDubboRequest(inv, ctx, ir)
})
diff --git a/protocol/dubbo/server/dubbo_conn.go b/protocol/dubbo/server/dubbo_conn.go
index db0dcbd..f1d3728 100644
--- a/protocol/dubbo/server/dubbo_conn.go
+++ b/protocol/dubbo/server/dubbo_conn.go
@@ -177,9 +177,8 @@
//这里重新分配MSGID
srcMsgID := ctx.Req.GetMsgID()
dstMsgID := dubbo.GenerateMsgID()
- lager.Logger.Info(fmt.Sprintf("dubbo2dubbo srcMsgID=%d, newMsgID=%d", srcMsgID, dstMsgID))
+ //lager.Logger.Info(fmt.Sprintf("dubbo2dubbo srcMsgID=%d, newMsgID=%d", srcMsgID, dstMsgID))
ctx.Req.SetMsgID(dstMsgID)
-
err := dubboproxy.Handle(ctx)
if err != nil {
ctx.Rsp.SetErrorMsg(err.Error())
@@ -191,6 +190,7 @@
}
if req.IsTwoWay() {
this.msgque.Enqueue(ctx.Rsp)
+
}
}
@@ -205,7 +205,8 @@
var buffer util.WriteBuffer
buffer.Init(0)
this.codec.EncodeDubboRsp(msg.(*dubbo.DubboRsp), &buffer)
- _, err = this.conn.Write(buffer.GetValidData())
+ bs := buffer.GetValidData()
+ _, err = this.conn.Write(bs /*buffer.GetValidData()*/)
if err != nil {
lager.Logger.Error("Send exception: " + err.Error())
break
diff --git a/protocol/dubbo/simpleRegistry/simple_registry_server.go b/protocol/dubbo/simpleRegistry/simple_registry_server.go
index 5ba1fce..5f9e710 100644
--- a/protocol/dubbo/simpleRegistry/simple_registry_server.go
+++ b/protocol/dubbo/simpleRegistry/simple_registry_server.go
@@ -21,6 +21,7 @@
"github.com/go-chassis/go-chassis/core/lager"
"github.com/go-chassis/go-chassis/core/server"
"github.com/go-mesh/mesher/protocol/dubbo/dubbo"
+ "github.com/go-mesh/mesher/protocol/dubbo/proxy"
"github.com/go-mesh/mesher/protocol/dubbo/utils"
"net"
"sync"
@@ -134,6 +135,7 @@
//SendVoidRespond is a method to send void respose
func SendVoidRespond(conn net.Conn, req *dubbo.Request) {
+ dubboproxy.IsProvider = true
var rsp dubbo.DubboRsp
var wBuf util.WriteBuffer
wBuf = util.WriteBuffer{}
diff --git a/protocol/dubbo/utils/buffer.go b/protocol/dubbo/utils/buffer.go
index f8790c1..0d0d0a6 100644
--- a/protocol/dubbo/utils/buffer.go
+++ b/protocol/dubbo/utils/buffer.go
@@ -163,6 +163,11 @@
b.capacity = capacity
}
+//GetBuf is a method to get buffer
+func (b *ReadBuffer) GetBuf() []byte {
+ return b.buffer
+}
+
//ReadByte is a method to read particular byte from buffer
func (b *ReadBuffer) ReadByte() byte {
var tmp interface{}
diff --git a/protocol/dubbo/utils/thrmgr.go b/protocol/dubbo/utils/thrmgr.go
index 67ad323..f836cfb 100644
--- a/protocol/dubbo/utils/thrmgr.go
+++ b/protocol/dubbo/utils/thrmgr.go
@@ -18,7 +18,6 @@
package util
import (
- "github.com/go-chassis/go-chassis/core/lager"
"sync"
)
@@ -85,7 +84,6 @@
//Spawn is a method which spawns new routine
func (this *RoutineManager) Spawn(task RoutineTask, agrs interface{}, routineName string) {
- lager.Logger.Info("Routine spawn:" + routineName)
this.wg.Add(1)
go this.spawn(task, agrs, routineName)
}
@@ -93,7 +91,6 @@
func (this *RoutineManager) spawn(task RoutineTask, agrs interface{}, routineName string) {
task.Svc(agrs)
this.wg.Done()
- lager.Logger.Info("Routine exit:" + routineName)
}
//Done is a method which tells waitgroup that it's done waiting