blob: 0251b78a2b0d27a68461c16c284b1af53bcb08aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dubbo
import (
"context"
"fmt"
"net/url"
"sync"
"sync/atomic"
"time"
)
import (
"github.com/apache/dubbo-go-hessian2"
"github.com/dubbogo/getty"
"github.com/opentracing/opentracing-go"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
// todo: WritePkg_Timeout will entry *.yml
const (
// WritePkg_Timeout ...
WritePkg_Timeout = 5 * time.Second
)
var (
errTooManySessions = perrors.New("too many sessions")
)
type rpcSession struct {
session getty.Session
reqNum int32
}
func (s *rpcSession) AddReqNum(num int32) {
atomic.AddInt32(&s.reqNum, num)
}
func (s *rpcSession) GetReqNum() int32 {
return atomic.LoadInt32(&s.reqNum)
}
// //////////////////////////////////////////
// RpcClientHandler
// //////////////////////////////////////////
// RpcClientHandler ...
type RpcClientHandler struct {
conn *gettyRPCClient
}
// NewRpcClientHandler ...
func NewRpcClientHandler(client *gettyRPCClient) *RpcClientHandler {
return &RpcClientHandler{conn: client}
}
// OnOpen ...
func (h *RpcClientHandler) OnOpen(session getty.Session) error {
h.conn.addSession(session)
return nil
}
// OnError ...
func (h *RpcClientHandler) OnError(session getty.Session, err error) {
logger.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.conn.removeSession(session)
}
// OnClose ...
func (h *RpcClientHandler) OnClose(session getty.Session) {
logger.Infof("session{%s} is closing......", session.Stat())
h.conn.removeSession(session)
}
// OnMessage ...
func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
p, ok := pkg.(*DubboPackage)
if !ok {
logger.Errorf("illegal package")
return
}
if p.Header.Type&hessian.PackageHeartbeat != 0x00 {
if p.Header.Type&hessian.PackageResponse != 0x00 {
logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body)
if p.Err != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", p.Err)
}
h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID))
} else {
logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body)
p.Header.ResponseStatus = hessian.Response_OK
reply(session, p, hessian.PackageHeartbeat)
}
return
}
logger.Debugf("get rpc response{header: %#v, body: %#v}", p.Header, p.Body)
h.conn.updateSession(session)
pendingResponse := h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID))
if pendingResponse == nil {
logger.Errorf("failed to get pending response context for response package %s", *p)
return
}
if p.Err != nil {
pendingResponse.err = p.Err
}
pendingResponse.response.atta = p.Body.(*Response).atta
if pendingResponse.callback == nil {
pendingResponse.done <- struct{}{}
} else {
pendingResponse.callback(pendingResponse.GetCallResponse())
}
}
// OnCron ...
func (h *RpcClientHandler) OnCron(session getty.Session) {
rpcSession, err := h.conn.getClientRpcSession(session)
if err != nil {
logger.Errorf("client.getClientSession(session{%s}) = error{%v}",
session.Stat(), perrors.WithStack(err))
return
}
if h.conn.pool.rpcClient.conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() {
logger.Warnf("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(session.GetActive()).String(), rpcSession.reqNum)
h.conn.removeSession(session) // -> h.conn.close() -> h.conn.pool.remove(h.conn)
return
}
h.conn.pool.rpcClient.heartbeat(session)
}
// //////////////////////////////////////////
// RpcServerHandler
// //////////////////////////////////////////
// RpcServerHandler ...
type RpcServerHandler struct {
maxSessionNum int
sessionTimeout time.Duration
sessionMap map[getty.Session]*rpcSession
rwlock sync.RWMutex
}
// NewRpcServerHandler ...
func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler {
return &RpcServerHandler{
maxSessionNum: maxSessionNum,
sessionTimeout: sessionTimeout,
sessionMap: make(map[getty.Session]*rpcSession),
}
}
// OnOpen ...
func (h *RpcServerHandler) OnOpen(session getty.Session) error {
var err error
h.rwlock.RLock()
if h.maxSessionNum <= len(h.sessionMap) {
err = errTooManySessions
}
h.rwlock.RUnlock()
if err != nil {
return perrors.WithStack(err)
}
logger.Infof("got session:%s", session.Stat())
h.rwlock.Lock()
h.sessionMap[session] = &rpcSession{session: session}
h.rwlock.Unlock()
return nil
}
// OnError ...
func (h *RpcServerHandler) OnError(session getty.Session, err error) {
logger.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}
// OnClose ...
func (h *RpcServerHandler) OnClose(session getty.Session) {
logger.Infof("session{%s} is closing......", session.Stat())
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}
// OnMessage ...
func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
h.rwlock.Lock()
if _, ok := h.sessionMap[session]; ok {
h.sessionMap[session].reqNum++
}
h.rwlock.Unlock()
p, ok := pkg.(*DubboPackage)
if !ok {
logger.Errorf("illegal package{%#v}", pkg)
return
}
p.Header.ResponseStatus = hessian.Response_OK
// heartbeat
if p.Header.Type&hessian.PackageHeartbeat != 0x00 {
logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body)
reply(session, p, hessian.PackageHeartbeat)
return
}
twoway := true
// not twoway
if p.Header.Type&hessian.PackageRequest_TwoWay == 0x00 {
twoway = false
}
defer func() {
if e := recover(); e != nil {
p.Header.ResponseStatus = hessian.Response_SERVER_ERROR
if err, ok := e.(error); ok {
logger.Errorf("OnMessage panic: %+v", perrors.WithStack(err))
p.Body = perrors.WithStack(err)
} else if err, ok := e.(string); ok {
logger.Errorf("OnMessage panic: %+v", perrors.New(err))
p.Body = perrors.New(err)
} else {
logger.Errorf("OnMessage panic: %+v, this is impossible.", e)
p.Body = e
}
if !twoway {
return
}
reply(session, p, hessian.PackageResponse)
}
}()
u := common.NewURLWithOptions(common.WithPath(p.Service.Path), common.WithParams(url.Values{}),
common.WithParamsValue(constant.GROUP_KEY, p.Service.Group),
common.WithParamsValue(constant.INTERFACE_KEY, p.Service.Interface),
common.WithParamsValue(constant.VERSION_KEY, p.Service.Version))
exporter, _ := dubboProtocol.ExporterMap().Load(u.ServiceKey())
if exporter == nil {
err := fmt.Errorf("don't have this exporter, key: %s", u.ServiceKey())
logger.Errorf(err.Error())
p.Header.ResponseStatus = hessian.Response_OK
p.Body = err
reply(session, p, hessian.PackageResponse)
return
}
invoker := exporter.(protocol.Exporter).GetInvoker()
if invoker != nil {
attachments := p.Body.(map[string]interface{})["attachments"].(map[string]string)
attachments[constant.LOCAL_ADDR] = session.LocalAddr()
attachments[constant.REMOTE_ADDR] = session.RemoteAddr()
args := p.Body.(map[string]interface{})["args"].([]interface{})
inv := invocation.NewRPCInvocation(p.Service.Method, args, attachments)
ctx := rebuildCtx(inv)
result := invoker.Invoke(ctx, inv)
if err := result.Error(); err != nil {
p.Header.ResponseStatus = hessian.Response_OK
p.Body = hessian.NewResponse(nil, err, result.Attachments())
} else {
res := result.Result()
p.Header.ResponseStatus = hessian.Response_OK
p.Body = hessian.NewResponse(res, nil, result.Attachments())
}
}
if !twoway {
return
}
reply(session, p, hessian.PackageResponse)
}
// OnCron ...
func (h *RpcServerHandler) OnCron(session getty.Session) {
var (
flag bool
active time.Time
)
h.rwlock.RLock()
if _, ok := h.sessionMap[session]; ok {
active = session.GetActive()
if h.sessionTimeout.Nanoseconds() < time.Since(active).Nanoseconds() {
flag = true
logger.Warnf("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(active).String(), h.sessionMap[session].reqNum)
}
}
h.rwlock.RUnlock()
if flag {
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
session.Close()
}
}
// rebuildCtx rebuild the context by attachment.
// Once we decided to transfer more context's key-value, we should change this.
// now we only support rebuild the tracing context
func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
ctx := context.Background()
// actually, if user do not use any opentracing framework, the err will not be nil.
spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
opentracing.TextMapCarrier(inv.Attachments()))
if err == nil {
ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
}
return ctx
}
func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) {
resp := &DubboPackage{
Header: hessian.DubboHeader{
SerialID: req.Header.SerialID,
Type: tp,
ID: req.Header.ID,
ResponseStatus: req.Header.ResponseStatus,
},
}
if req.Header.Type&hessian.PackageRequest != 0x00 {
resp.Body = req.Body
} else {
resp.Body = nil
}
if err := session.WritePkg(resp, WritePkg_Timeout); err != nil {
logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), req.Header)
}
}