blob: 127add3e57009bcc35d0f6a5c03f40da5471959f [file] [log] [blame]
package server
import (
"github.com/pkg/errors"
"github.com/dubbogo/getty"
"github.com/dk-lockdown/seata-golang/logging"
"github.com/dk-lockdown/seata-golang/meta"
"github.com/dk-lockdown/seata-golang/model"
"github.com/dk-lockdown/seata-golang/protocal"
"strconv"
"strings"
"sync"
)
var (
/**
* resourceId -> applicationId -> ip -> port -> RpcContext
*/
rm_sessions = sync.Map{}
/**
* ip+appname -> port -> RpcContext
*/
tm_sessions = sync.Map{}
)
const (
ClientIdSplitChar = ":"
DbkeysSplitChar = ","
)
type GettySessionManager struct {
IdentifiedSessions *sync.Map
}
var SessionManager GettySessionManager
func init() {
SessionManager = GettySessionManager{IdentifiedSessions:&sync.Map{}}
}
func (manager *GettySessionManager) IsRegistered(session getty.Session) bool {
_,ok := manager.IdentifiedSessions.Load(session)
return ok
}
func (manager *GettySessionManager) GetRoleFromGettySession(session getty.Session) meta.TransactionRole {
context, ok := manager.IdentifiedSessions.Load(session)
if ok {
return context.(*RpcContext).ClientRole
}
return 0
}
func (manager *GettySessionManager) GetContextFromIdentified(session getty.Session) *RpcContext {
context, ok := manager.IdentifiedSessions.Load(session)
if ok {
rpcContext := context.(*RpcContext)
return rpcContext
}
return nil
}
func (manager *GettySessionManager) RegisterTmGettySession(request protocal.RegisterTMRequest,session getty.Session) {
//todo check version
rpcContext := buildGettySessionHolder(meta.TMROLE,request.Version,request.ApplicationId,request.TransactionServiceGroup,"",session)
rpcContext.HoldInIdentifiedGettySessions(manager.IdentifiedSessions)
clientIdentified := rpcContext.ApplicationId + ClientIdSplitChar + getClientIpFromGettySession(session)
clientIdentifiedMap,_ := tm_sessions.LoadOrStore(clientIdentified,&sync.Map{})
cMap := clientIdentifiedMap.(*sync.Map)
rpcContext.HoldInClientGettySessions(cMap)
}
func (manager *GettySessionManager) RegisterRmGettySession(resourceManagerRequest protocal.RegisterRMRequest,session getty.Session){
//todo check version
var rpcContext *RpcContext
dbKeySet := dbKeyToSet(resourceManagerRequest.ResourceIds)
context,ok := manager.IdentifiedSessions.Load(session)
if ok {
rpcContext = context.(*RpcContext)
rpcContext.AddResources(dbKeySet)
} else {
rpcContext = buildGettySessionHolder(meta.RMROLE,resourceManagerRequest.Version,resourceManagerRequest.ApplicationId,
resourceManagerRequest.TransactionServiceGroup,resourceManagerRequest.ResourceIds,session)
rpcContext.HoldInIdentifiedGettySessions(manager.IdentifiedSessions)
}
if dbKeySet == nil || dbKeySet.IsEmpty() { return }
for _,resourceId := range dbKeySet.List() {
applicationMap,_ := rm_sessions.LoadOrStore(resourceId,&sync.Map{})
aMap,_ := applicationMap.(*sync.Map)
ipMap,_ := aMap.LoadOrStore(resourceManagerRequest.ApplicationId,&sync.Map{})
iMap,_ := ipMap.(*sync.Map)
clientIp := getClientIpFromGettySession(session)
portMap,_ := iMap.LoadOrStore(clientIp,&sync.Map{})
pMap,_ := portMap.(*sync.Map)
rpcContext.HoldInResourceManagerGettySessions(resourceId,pMap)
// 老实讲,我不知道为什么要写这么一个方法,双重保证?
manager.updateGettySessionsResource(resourceId,clientIp,resourceManagerRequest.ApplicationId)
}
}
func (manager *GettySessionManager) updateGettySessionsResource(resourceId string,clientIp string,applicationId string) {
applicationMap,_ := rm_sessions.Load(resourceId)
aMap,_ := applicationMap.(*sync.Map)
ipMap,_ := aMap.Load(applicationId)
iMap,_ := ipMap.(*sync.Map)
portMap,_ := iMap.Load(clientIp)
pMap,_ := portMap.(*sync.Map)
rm_sessions.Range(func (key interface{},value interface{}) bool {
resourceKey,ok := key.(string)
if ok && resourceKey != resourceId {
appMap,_ := value.(*sync.Map)
clientIpMap,clientIpMapLoaded := appMap.Load(applicationId)
if clientIpMapLoaded {
cipMap,_ := clientIpMap.(*sync.Map)
clientPortMap, clientPortMapLoaded := cipMap.Load(clientIp)
if clientPortMapLoaded {
cpMap := clientPortMap.(*sync.Map)
cpMap.Range(func (key interface{},value interface{}) bool{
port,_ := key.(int)
rpcContext,_ := value.(*RpcContext)
_, ok := pMap.LoadOrStore(port,rpcContext)
if ok {
rpcContext.HoldInResourceManagerGettySessionsWithoutPortMap(resourceId,port)
}
return true
})
}
}
}
return true
})
}
func (manager *GettySessionManager) GetSameClientGettySession(session getty.Session) getty.Session {
if !session.IsClosed() {
return session
}
rpcContext := manager.GetContextFromIdentified(session)
if rpcContext == nil {
logging.Logger.Errorf("rpcContext is null,channel:{%v},active:{%t}",session,!session.IsClosed())
}
if !rpcContext.session.IsClosed() {
return rpcContext.session
}
clientPort := getClientPortFromGettySession(session)
if rpcContext.ClientRole == meta.TMROLE {
clientIdentified := rpcContext.ApplicationId + ClientIdSplitChar + getClientIpFromGettySession(session)
clientRpcMap, ok := tm_sessions.Load(clientIdentified)
if !ok {
return nil
}
clientMap := clientRpcMap.(*sync.Map)
return getGettySessionFromSameClientMap(clientMap,clientPort)
} else if rpcContext.ClientRole == meta.RMROLE {
var sameClientSession getty.Session
rpcContext.ClientRMHolderMap.Range(func (key interface{},value interface{}) bool {
clientRmMap := value.(*sync.Map)
sameClientSession = getGettySessionFromSameClientMap(clientRmMap,clientPort)
if sameClientSession != nil {
return false
}
return true
})
return sameClientSession
}
return nil
}
func getGettySessionFromSameClientMap(clientGettySessionMap *sync.Map,exclusivePort int) getty.Session {
var session getty.Session
if clientGettySessionMap != nil {
clientGettySessionMap.Range(func (key interface{},value interface{}) bool {
port,ok := key.(int)
if ok {
if port == exclusivePort {
clientGettySessionMap.Delete(key)
return true
}
}
context := value.(*RpcContext)
session = context.session
if !session.IsClosed() {
return false
}
clientGettySessionMap.Delete(key)
return true
})
}
return session
}
func (manager *GettySessionManager) GetGettySession(resourceId string,clientId string) (getty.Session,error) {
var resultSession getty.Session
clientIdInfo := strings.Split(clientId,ClientIdSplitChar)
if clientIdInfo == nil || len(clientIdInfo) != 3 {
return nil,errors.Errorf("Invalid Client ID:%d",clientId)
}
targetApplicationId := clientIdInfo[0]
targetIP := clientIdInfo[1]
targetPort,_ := strconv.Atoi(clientIdInfo[2])
applicationMap,ok := rm_sessions.Load(resourceId)
if targetApplicationId == "" || !ok || applicationMap == nil {
logging.Logger.Infof("No channel is available for resource[%s]",resourceId)
}
appMap,_ := applicationMap.(*sync.Map)
clientIpMap,clientIpMapLoaded := appMap.Load(targetApplicationId)
if clientIpMapLoaded {
ipMap,_ := clientIpMap.(*sync.Map)
portMap,portMapLoaded := ipMap.Load(targetIP)
if portMapLoaded {
pMap,_ := portMap.(*sync.Map)
context,contextLoaded := pMap.Load(targetPort)
// Firstly, try to find the original channel through which the branch was registered.
if contextLoaded {
rpcContext := context.(*RpcContext)
if !rpcContext.session.IsClosed() {
resultSession = rpcContext.session
logging.Logger.Debugf("Just got exactly the one %v for %s",rpcContext.session,clientId)
} else {
pMap.Delete(targetPort)
logging.Logger.Infof("Removed inactive %d",rpcContext.session)
}
}
// The original channel was broken, try another one.
if resultSession == nil {
pMap.Range(func (key interface{},value interface{}) bool {
rpcContext := value.(*RpcContext)
if !rpcContext.session.IsClosed() {
resultSession = rpcContext.session
logging.Logger.Infof("Choose %v on the same IP[%s] as alternative of %s",rpcContext.session,targetIP,clientId)
//跳出 range 循环
return false
} else {
pMap.Delete(key)
logging.Logger.Infof("Removed inactive %d",rpcContext.session)
}
return true
})
}
}
// No channel on the this app node, try another one.
if resultSession == nil {
ipMap.Range(func (key interface{},value interface{}) bool {
ip := key.(string)
if ip == targetIP { return true }
portMapOnOtherIP,_ := value.(*sync.Map)
if portMapOnOtherIP == nil { return true }
portMapOnOtherIP.Range(func (key interface{},value interface {}) bool {
rpcContext := value.(*RpcContext)
if !rpcContext.session.IsClosed() {
resultSession = rpcContext.session
logging.Logger.Infof("Choose %v on the same application[%s] as alternative of %s",rpcContext.session,targetApplicationId,clientId)
//跳出 range 循环
return false
} else {
portMapOnOtherIP.Delete(key)
logging.Logger.Infof("Removed inactive %d",rpcContext.session)
}
return true
})
if resultSession != nil { return false }
return true
})
}
}
if resultSession == nil {
resultSession = tryOtherApp(appMap,targetApplicationId)
if resultSession == nil {
logging.Logger.Infof("No channel is available for resource[%s] as alternative of %s",resourceId,clientId)
} else {
logging.Logger.Infof("Choose %v on the same resource[%s] as alternative of %s", resultSession, resourceId, clientId)
}
}
return resultSession,nil
}
func tryOtherApp(applicationMap *sync.Map,myApplicationId string) getty.Session {
var chosenChannel getty.Session
applicationMap.Range(func (key interface{},value interface{}) bool {
applicationId := key.(string)
if myApplicationId != "" && applicationId == myApplicationId {return true}
targetIPMap,_ := value.(*sync.Map)
targetIPMap.Range(func (key interface{},value interface{}) bool {
if value == nil { return true }
portMap,_ := value.(*sync.Map)
portMap.Range(func (key interface{},value interface{}) bool {
rpcContext := value.(*RpcContext)
if !rpcContext.session.IsClosed() {
chosenChannel = rpcContext.session
return false
} else {
portMap.Delete(key)
logging.Logger.Infof("Removed inactive %d",rpcContext.session)
}
return true
})
if chosenChannel != nil { return false }
return true
})
if chosenChannel != nil { return false }
return true
})
return chosenChannel
}
func buildGettySessionHolder(role meta.TransactionRole,version string,applicationId string,
txServiceGroup string,dbKeys string,session getty.Session) *RpcContext {
return &RpcContext{
ClientRole: role,
Version: version,
ApplicationId: applicationId,
TransactionServiceGroup: txServiceGroup,
ClientId: buildClientId(applicationId,session),
session: session,
ResourceSets: dbKeyToSet(dbKeys),
}
}
func dbKeyToSet(dbKey string) *model.Set {
if dbKey == "" {
return nil
}
keys := strings.Split(dbKey,DbkeysSplitChar)
set := model.NewSet()
for _,key := range keys {
set.Add(key)
}
return set
}
func buildClientId(applicationId string, session getty.Session) string {
return applicationId + ClientIdSplitChar + session.RemoteAddr()
}
func (manager *GettySessionManager) GetRmSessions() map[string]getty.Session {
sessions := make(map[string]getty.Session)
rm_sessions.Range(func (key interface{},value interface{}) bool {
resourceId,_ := key.(string)
applicationMap := value.(*sync.Map)
session := tryOtherApp(applicationMap,"")
if session == nil {
return false
}
sessions[resourceId] = session
return true
})
return sessions
}