blob: d06a0842749f9051a9e3bba9d9d3eabd936a5f24 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package server
import (
"fmt"
"github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/dubbo"
"github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/proxy"
"github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/utils"
"github.com/go-chassis/openlog"
"net"
"sync"
)
//SndTask is a struct
type SndTask struct{}
//Svc is a method
func (this SndTask) Svc(arg interface{}) interface{} {
dubboConn := arg.(*DubboConnection)
dubboConn.MsgSndLoop()
return nil
}
//RecvTask is a struct
type RecvTask struct {
}
//Svc is a method
func (this RecvTask) Svc(arg interface{}) interface{} {
dubboConn := arg.(*DubboConnection)
dubboConn.MsgRecvLoop()
return nil
}
//ProcessTask is a struct
type ProcessTask struct {
conn *DubboConnection
req *dubbo.Request
bufBody []byte
}
//Svc is a method
func (this ProcessTask) Svc(arg interface{}) interface{} {
if this.conn != nil {
this.conn.ProcessBody(this.req, this.bufBody)
}
return nil
}
//DubboConnection is a struct which has attributes for dubbo connection
type DubboConnection struct {
msgque *util.MsgQueue
remoteAddr string
conn *net.TCPConn
codec dubbo.DubboCodec
mtx sync.Mutex
routineMgr *util.RoutineManager
closed bool
}
//NewDubboConnetction is a function to create new dubbo connection
func NewDubboConnetction(conn *net.TCPConn, routineMgr *util.RoutineManager) *DubboConnection {
tmp := new(DubboConnection)
tmp.conn = conn
tmp.codec = dubbo.DubboCodec{}
tmp.msgque = util.NewMsgQueue()
tmp.remoteAddr = conn.RemoteAddr().String()
tmp.closed = false
if routineMgr == nil {
tmp.routineMgr = util.NewRoutineManager()
}
return tmp
}
//Open is a function to open a connection
func (this *DubboConnection) Open() {
this.routineMgr.Spawn(SndTask{}, this, fmt.Sprintf("Snd-%s->%s", this.conn.LocalAddr().String(), this.conn.RemoteAddr().String()))
this.routineMgr.Spawn(RecvTask{}, this, fmt.Sprintf("Recv-%s->%s", this.conn.LocalAddr().String(), this.conn.RemoteAddr().String()))
}
//Close is a function to close a connection
func (this *DubboConnection) Close() {
this.mtx.Lock()
defer this.mtx.Unlock()
if this.closed {
return
}
this.closed = true
this.msgque.Deavtive()
this.conn.Close()
}
//MsgRecvLoop is a method receive data
func (this *DubboConnection) MsgRecvLoop() {
//通知处理应答消息
for {
//先处理消息头
buf := make([]byte, dubbo.HeaderLength)
size, err := this.conn.Read(buf)
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
openlog.Error("Dubbo server Recv head: " + err.Error())
continue
}
openlog.Error("Dubbo server Recv head: " + err.Error())
break
}
if size < dubbo.HeaderLength {
openlog.Info("Invalid msg head")
continue
}
req := new(dubbo.Request)
bodyLen := 0
ret := this.codec.DecodeDubboReqHead(req, buf, &bodyLen)
if ret != dubbo.Success {
openlog.Info("Invalid msg head")
continue
}
body := make([]byte, bodyLen)
count := 0
for {
redBuff := body[count:]
size, err = this.conn.Read(redBuff)
if err != nil {
//通知关闭连接
openlog.Error("Recv: " + err.Error())
goto exitloop
}
count += size
if count == bodyLen {
break
}
}
this.routineMgr.Spawn(ProcessTask{this, req, body}, nil, fmt.Sprintf("ProcessTask-%d", req.GetMsgID()))
}
exitloop:
this.Close()
}
//ProcessBody is a method to process the body of response
func (this *DubboConnection) ProcessBody(req *dubbo.Request, bufBody []byte) {
var buffer util.ReadBuffer
buffer.SetBuffer(bufBody)
this.codec.DecodeDubboReqBody(req, &buffer)
this.HandleMsg(req)
}
//HandleMsg is a method
func (this *DubboConnection) HandleMsg(req *dubbo.Request) {
//这里发送Rest请求以及收发送应答
ctx := &dubbo.InvokeContext{req, &dubbo.DubboRsp{}, nil, "", this.remoteAddr}
ctx.Rsp.Init()
ctx.Rsp.SetID(req.GetMsgID())
if req.IsHeartbeat() {
ctx.Rsp.SetValue(nil)
ctx.Rsp.SetEvent(true)
ctx.Rsp.SetStatus(dubbo.Ok)
} else {
//这里重新分配MSGID
srcMsgID := ctx.Req.GetMsgID()
dstMsgID := dubbo.GenerateMsgID()
//openlog.Info(fmt.Sprintf("dubbo2dubbo srcMsgID=%d, newMsgID=%d", srcMsgID, dstMsgID))
ctx.Req.SetMsgID(dstMsgID)
err := dubboproxy.Handle(ctx)
if err != nil {
ctx.Rsp.SetErrorMsg(err.Error())
openlog.Error("request: " + err.Error())
ctx.Rsp.SetStatus(dubbo.ServerError)
}
ctx.Req.SetMsgID(srcMsgID)
ctx.Rsp.SetID(srcMsgID)
}
if req.IsTwoWay() {
this.msgque.Enqueue(ctx.Rsp)
}
}
//MsgSndLoop is a method to send data
func (this *DubboConnection) MsgSndLoop() {
for {
msg, err := this.msgque.Dequeue()
if err != nil {
openlog.Error("MsgSndLoop Dequeue: " + err.Error())
break
}
var buffer util.WriteBuffer
buffer.Init(0)
this.codec.EncodeDubboRsp(msg.(*dubbo.DubboRsp), &buffer)
bs := buffer.GetValidData()
_, err = this.conn.Write(bs /*buffer.GetValidData()*/)
if err != nil {
openlog.Error("Send exception: " + err.Error())
break
}
}
this.Close()
}