dubbo 协议修改(不再解析原始数据,直接透传) (#40)

* dubbo 协议修改(不再解析原始数据,直接透传)

* dubbo protocol modify
diff --git a/protocol/dubbo/client/chassis/dubbo_chassis_client.go b/protocol/dubbo/client/chassis/dubbo_chassis_client.go
index 5181cbb..44de38d 100644
--- a/protocol/dubbo/client/chassis/dubbo_chassis_client.go
+++ b/protocol/dubbo/client/chassis/dubbo_chassis_client.go
@@ -48,7 +48,6 @@
 
 //NewDubboChassisClient create new client
 func NewDubboChassisClient(options client.Options) (client.ProtocolClient, error) {
-
 	rc := &dubboChassisClient{
 		once: sync.Once{},
 		opts: options,
@@ -72,7 +71,7 @@
 	if endPoint == "" {
 		return &util.BaseError{" The endpoint is empty"}
 	}
-	lager.Logger.Info("Dubbo invoke endPont: " + endPoint)
+
 	dubboCli, err := dubboClient.CachedClients.GetClient(endPoint)
 	if err != nil {
 		lager.Logger.Errorf("Invalid Request addr %s %s", endPoint, err)
diff --git a/protocol/dubbo/client/client_conn.go b/protocol/dubbo/client/client_conn.go
index 2d363fb..fe42b48 100644
--- a/protocol/dubbo/client/client_conn.go
+++ b/protocol/dubbo/client/client_conn.go
@@ -112,7 +112,6 @@
 	//通知处理应答消息
 	for {
 		//先处理消息头
-
 		buf := make([]byte, dubbo.HeaderLength)
 		size, err := this.conn.Read(buf)
 		if err != nil {
diff --git a/protocol/dubbo/dubbo/codec.go b/protocol/dubbo/dubbo/codec.go
index 26b0b89..b98f05c 100644
--- a/protocol/dubbo/dubbo/codec.go
+++ b/protocol/dubbo/dubbo/codec.go
@@ -98,8 +98,13 @@
 				if ret == nil {
 					buffer.WriteByte(ResponseNullValue)
 				} else {
-					buffer.WriteByte(ResponseValue)
-					buffer.WriteObject(ret)
+					//buffer.WriteByte(ResponseValue)
+					if v, ok := ret.([]byte); ok {
+						buffer.WriteBytes(v)
+					} else {
+						buffer.WriteObject(ret)
+					}
+
 				}
 			} else {
 				buffer.WriteByte(ResponseWithException)
@@ -127,7 +132,6 @@
 
 //DecodeDubboRsqHead is a method which decodes dubbo response header
 func (p *DubboCodec) DecodeDubboRsqHead(rsp *DubboRsp, header []byte, bodyLen *int) int {
-
 	if header[0] != MagicHigh || header[1] != MagicLow {
 		return InvalidFragement
 	}
@@ -155,7 +159,6 @@
 func (p *DubboCodec) DecodeDubboRspBody(buffer *util.ReadBuffer, rsp *DubboRsp) int {
 	var obj interface{}
 	var err error
-
 	if rsp.IsHeartbeat() {
 		rsp.SetValue(HeartBeatEvent)
 	}
@@ -203,7 +206,8 @@
 				}
 			}
 		}
-		rsp.SetValue(obj)
+		rsp.SetValue(buffer.GetBuf())
+		//rsp.SetValue(obj)
 	} else {
 		obj, err = buffer.ReadObject()
 		if err != nil {
@@ -245,30 +249,9 @@
 		return -1
 	}
 
-	//写入dubbo version
-	buffer.WriteObject(req.GetAttachment(DubboVersionKey, DubboVersion))
-	//写入path key
-	buffer.WriteObject(req.GetAttachment(PathKey, ""))
-	//写入接口version key
-	buffer.WriteObject(req.GetAttachment(VersionKey, "0.0.0"))
-	//写入方法名称
-	buffer.WriteObject(req.GetMethodName())
-	//写入参数类型列表
-	buffer.WriteObject(util.GetJavaDesc(req.GetArguments()))
-	//写入参数列表
-	var argObjs []util.Argument
-	argObjs = req.GetArguments()
-	var err error
-	if argObjs != nil {
-		size := len(argObjs)
-		for i := 0; i < size; i++ {
-			err = buffer.WriteObject(argObjs[i].GetValue())
-			if err != nil {
-				return -1
-			}
-		}
+	if v, ok := req.GetData().([]byte); ok {
+		buffer.WriteBytes(v)
 	}
-	//写入attatchmanets
 	buffer.WriteObject(req.GetAttachments())
 
 	len := buffer.WrittenBytes() - HeaderLength
@@ -370,33 +353,8 @@
 		req.SetAttachment(VersionKey, bodyBuf.ReadString())
 		req.SetVersion(req.GetAttachment(VersionKey, ""))
 		req.SetMethodName(bodyBuf.ReadString())
-		//解析参数
-		typeDesc := string(bodyBuf.ReadString())
-		agrsArry := util.TypeDesToArgsObjArry(typeDesc)
-		if typeDesc == "" {
-			agrsArry = nil
-		} else {
-			size := len(agrsArry)
-			for i := 0; i < size; i++ {
-				val, err := bodyBuf.ReadObject()
-				if err != nil {
-					req.SetBroken(true)
-					req.SetData(err.Error())
-					return -1
-				} else {
-					agrsArry[i].SetValue(val)
-				}
-			}
-			req.SetArguments(agrsArry)
-		}
-		attatchments, err := bodyBuf.ReadMap()
-		if err == nil {
-			req.SetAttachments(attatchments)
-		} else {
-			req.SetBroken(true)
-			req.SetData(err.Error())
-			return -1
-		}
+
+		obj = bodyBuf.GetBuf()
 		req.SetBroken(false)
 		req.SetData(obj)
 	}
diff --git a/protocol/dubbo/proxy/dubbo_proxy_ouput.go b/protocol/dubbo/proxy/dubbo_proxy_ouput.go
index 919bb5b..2b99c6f 100755
--- a/protocol/dubbo/proxy/dubbo_proxy_ouput.go
+++ b/protocol/dubbo/proxy/dubbo_proxy_ouput.go
@@ -21,11 +21,13 @@
 	"context"
 	"encoding/json"
 	"fmt"
+	"github.com/go-mesh/mesher/cmd"
 	"net/http"
 	"net/url"
 
 	"github.com/go-chassis/go-chassis/client/rest"
 	"github.com/go-chassis/go-chassis/core/common"
+	chassisCommon "github.com/go-chassis/go-chassis/core/common"
 	chassisconfig "github.com/go-chassis/go-chassis/core/config"
 	"github.com/go-chassis/go-chassis/core/handler"
 	"github.com/go-chassis/go-chassis/core/invocation"
@@ -36,7 +38,6 @@
 	"github.com/go-chassis/go-chassis/pkg/util/httputil"
 	"github.com/go-chassis/go-chassis/pkg/util/tags"
 	"github.com/go-chassis/go-chassis/third_party/forked/afex/hystrix-go/hystrix"
-	"github.com/go-mesh/mesher/cmd"
 	mesherCommon "github.com/go-mesh/mesher/common"
 	mesherRuntime "github.com/go-mesh/mesher/pkg/runtime"
 	"github.com/go-mesh/mesher/protocol"
@@ -54,6 +55,9 @@
 	ProxyTag = "mesherproxy"
 )
 
+//IsProvider is variable of type boolean used for tag proxyed dubbo service as provider(true) or consumer(false)
+var IsProvider bool
+
 // DubboListenAddr is a variable of type string used for storing listen address
 var DubboListenAddr string
 
@@ -195,7 +199,7 @@
 	inv.SourceServiceID = runtime.ServiceID
 	inv.SourceMicroService = ctx.Req.GetAttachment(common.HeaderSourceName, "")
 	inv.Args = ctx.Req
-
+	inv.Ctx = context.WithValue(context.Background(), chassisCommon.ContextHeaderKey{}, ctx.Req.GetAttachments())
 	inv.MicroServiceName = svc.ServiceName
 	inv.RouteTags = utiltags.NewDefaultTag(svc.Version, svc.AppID)
 	value := ctx.Req.GetAttachment(ProxyTag, "")
@@ -209,11 +213,10 @@
 	SetLocalServiceAddress(inv)                //select local service
 	var err error
 	var c *handler.Chain
-
 	if inv.Protocol == "dubbo" {
 		//发送请求
-		value := ctx.Req.GetAttachment(ProxyTag, "")
-		if value == "" { //come from proxyedDubboSvc
+		//value := ctx.Req.GetAttachment(ProxyTag, "")
+		if !IsProvider { //come from proxyedDubboSvc
 			ctx.Req.SetAttachment(common.HeaderSourceName, chassisconfig.SelfServiceName)
 			ctx.Req.SetAttachment(ProxyTag, "true")
 
@@ -224,7 +227,6 @@
 					return err
 				}
 			}
-
 			c.Next(inv, func(ir *invocation.Response) error {
 				return handleDubboRequest(inv, ctx, ir)
 			})
diff --git a/protocol/dubbo/server/dubbo_conn.go b/protocol/dubbo/server/dubbo_conn.go
index db0dcbd..f1d3728 100644
--- a/protocol/dubbo/server/dubbo_conn.go
+++ b/protocol/dubbo/server/dubbo_conn.go
@@ -177,9 +177,8 @@
 		//这里重新分配MSGID
 		srcMsgID := ctx.Req.GetMsgID()
 		dstMsgID := dubbo.GenerateMsgID()
-		lager.Logger.Info(fmt.Sprintf("dubbo2dubbo srcMsgID=%d, newMsgID=%d", srcMsgID, dstMsgID))
+		//lager.Logger.Info(fmt.Sprintf("dubbo2dubbo srcMsgID=%d, newMsgID=%d", srcMsgID, dstMsgID))
 		ctx.Req.SetMsgID(dstMsgID)
-
 		err := dubboproxy.Handle(ctx)
 		if err != nil {
 			ctx.Rsp.SetErrorMsg(err.Error())
@@ -191,6 +190,7 @@
 	}
 	if req.IsTwoWay() {
 		this.msgque.Enqueue(ctx.Rsp)
+
 	}
 }
 
@@ -205,7 +205,8 @@
 		var buffer util.WriteBuffer
 		buffer.Init(0)
 		this.codec.EncodeDubboRsp(msg.(*dubbo.DubboRsp), &buffer)
-		_, err = this.conn.Write(buffer.GetValidData())
+		bs := buffer.GetValidData()
+		_, err = this.conn.Write(bs /*buffer.GetValidData()*/)
 		if err != nil {
 			lager.Logger.Error("Send exception: " + err.Error())
 			break
diff --git a/protocol/dubbo/simpleRegistry/simple_registry_server.go b/protocol/dubbo/simpleRegistry/simple_registry_server.go
index 5ba1fce..5f9e710 100644
--- a/protocol/dubbo/simpleRegistry/simple_registry_server.go
+++ b/protocol/dubbo/simpleRegistry/simple_registry_server.go
@@ -21,6 +21,7 @@
 	"github.com/go-chassis/go-chassis/core/lager"
 	"github.com/go-chassis/go-chassis/core/server"
 	"github.com/go-mesh/mesher/protocol/dubbo/dubbo"
+	"github.com/go-mesh/mesher/protocol/dubbo/proxy"
 	"github.com/go-mesh/mesher/protocol/dubbo/utils"
 	"net"
 	"sync"
@@ -134,6 +135,7 @@
 
 //SendVoidRespond is a method to send void respose
 func SendVoidRespond(conn net.Conn, req *dubbo.Request) {
+	dubboproxy.IsProvider = true
 	var rsp dubbo.DubboRsp
 	var wBuf util.WriteBuffer
 	wBuf = util.WriteBuffer{}
diff --git a/protocol/dubbo/utils/buffer.go b/protocol/dubbo/utils/buffer.go
index f8790c1..0d0d0a6 100644
--- a/protocol/dubbo/utils/buffer.go
+++ b/protocol/dubbo/utils/buffer.go
@@ -163,6 +163,11 @@
 	b.capacity = capacity
 }
 
+//GetBuf is a method to get buffer
+func (b *ReadBuffer) GetBuf() []byte {
+	return b.buffer
+}
+
 //ReadByte is a method to read particular byte from buffer
 func (b *ReadBuffer) ReadByte() byte {
 	var tmp interface{}
diff --git a/protocol/dubbo/utils/thrmgr.go b/protocol/dubbo/utils/thrmgr.go
index 67ad323..f836cfb 100644
--- a/protocol/dubbo/utils/thrmgr.go
+++ b/protocol/dubbo/utils/thrmgr.go
@@ -18,7 +18,6 @@
 package util
 
 import (
-	"github.com/go-chassis/go-chassis/core/lager"
 	"sync"
 )
 
@@ -85,7 +84,6 @@
 
 //Spawn is a method which spawns new routine
 func (this *RoutineManager) Spawn(task RoutineTask, agrs interface{}, routineName string) {
-	lager.Logger.Info("Routine spawn:" + routineName)
 	this.wg.Add(1)
 	go this.spawn(task, agrs, routineName)
 }
@@ -93,7 +91,6 @@
 func (this *RoutineManager) spawn(task RoutineTask, agrs interface{}, routineName string) {
 	task.Svc(agrs)
 	this.wg.Done()
-	lager.Logger.Info("Routine exit:" + routineName)
 }
 
 //Done is a method which tells waitgroup that it's done waiting