| package server |
| |
| import ( |
| "fmt" |
| "github.com/dubbogo/getty" |
| "github.com/pkg/errors" |
| "go.uber.org/atomic" |
| "github.com/dk-lockdown/seata-golang/logging" |
| "github.com/dk-lockdown/seata-golang/meta" |
| "github.com/dk-lockdown/seata-golang/protocal" |
| "github.com/dk-lockdown/seata-golang/protocal/codec" |
| "github.com/dk-lockdown/seata-golang/tc/config" |
| "github.com/dk-lockdown/seata-golang/tc/event" |
| "github.com/dk-lockdown/seata-golang/tc/holder" |
| "github.com/dk-lockdown/seata-golang/tc/lock" |
| "github.com/dk-lockdown/seata-golang/tc/session" |
| "github.com/dk-lockdown/seata-golang/util" |
| "sync" |
| "time" |
| ) |
| |
| const ( |
| RPC_REQUEST_TIMEOUT = 30 * time.Second |
| ALWAYS_RETRY_BOUNDARY = 0 |
| ) |
| |
| // MessageFuture ... |
| type MessageFuture struct { |
| id int32 |
| err error |
| response interface{} |
| done chan bool |
| } |
| |
| // NewMessageFuture ... |
| func NewMessageFuture(message protocal.RpcMessage) *MessageFuture { |
| return &MessageFuture{ |
| id: message.Id, |
| done: make(chan bool), |
| } |
| } |
| |
| type DefaultCoordinator struct { |
| conf config.ServerConfig |
| core ITransactionCoordinator |
| idGenerator atomic.Uint32 |
| futures *sync.Map |
| timeoutCheckTicker *time.Ticker |
| retryRollbackingTicker *time.Ticker |
| retryCommittingTicker *time.Ticker |
| asyncCommittingTicker *time.Ticker |
| undoLogDeleteTicker *time.Ticker |
| } |
| |
| func NewDefaultCoordinator(conf config.ServerConfig) *DefaultCoordinator { |
| coordinator := &DefaultCoordinator{ |
| conf: conf, |
| idGenerator: atomic.Uint32{}, |
| futures: &sync.Map{}, |
| timeoutCheckTicker: time.NewTicker(conf.TimeoutRetryPeriod), |
| retryRollbackingTicker: time.NewTicker(conf.RollbackingRetryPeriod), |
| retryCommittingTicker: time.NewTicker(conf.CommittingRetryPeriod), |
| asyncCommittingTicker: time.NewTicker(conf.AsynCommittingRetryPeriod), |
| undoLogDeleteTicker: time.NewTicker(conf.LogDeletePeriod), |
| } |
| core := NewCore(coordinator) |
| coordinator.core = core |
| |
| go coordinator.processTimeoutCheck() |
| go coordinator.processRetryRollbacking() |
| go coordinator.processRetryCommitting() |
| go coordinator.processAsyncCommitting() |
| go coordinator.processUndoLogDelete() |
| return coordinator |
| } |
| |
| |
| func (coordinator *DefaultCoordinator) OnOpen(session getty.Session) error { |
| logging.Logger.Infof("got session:%s", session.Stat()) |
| return nil |
| } |
| |
| func (coordinator *DefaultCoordinator) OnError(session getty.Session, err error) { |
| session.Close() |
| logging.Logger.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err) |
| } |
| |
| func (coordinator *DefaultCoordinator) OnClose(session getty.Session) { |
| logging.Logger.Info("session{%s} is closing......", session.Stat()) |
| } |
| |
| func (coordinator *DefaultCoordinator) OnMessage(session getty.Session, pkg interface{}) { |
| logging.Logger.Info("received message:{%v}", pkg) |
| rpcMessage,ok := pkg.(protocal.RpcMessage) |
| if ok { |
| _,isRegTM := rpcMessage.Body.(protocal.RegisterTMRequest) |
| if isRegTM { |
| coordinator.OnRegTmMessage(rpcMessage,session) |
| return |
| } |
| |
| heartBeat,isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage) |
| if isHeartBeat && heartBeat == protocal.HeartBeatMessagePing { |
| coordinator.OnCheckMessage(rpcMessage,session) |
| return |
| } |
| |
| if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST || |
| rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY { |
| logging.Logger.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body) |
| _,isRegRM := rpcMessage.Body.(protocal.RegisterRMRequest) |
| if isRegRM { |
| coordinator.OnRegRmMessage(rpcMessage,session) |
| } else { |
| if SessionManager.IsRegistered(session) { |
| coordinator.OnTrxMessage(rpcMessage,session) |
| } else { |
| session.Close() |
| logging.Logger.Infof("close a unhandled connection! [%v]", session) |
| } |
| } |
| } else { |
| resp,loaded := coordinator.futures.Load(rpcMessage.Id) |
| if loaded { |
| response := resp.(*MessageFuture) |
| response.response = rpcMessage.Body |
| response.done <- true |
| coordinator.futures.Delete(rpcMessage.Id) |
| } |
| } |
| } |
| } |
| |
| func (coordinator *DefaultCoordinator) OnCron(session getty.Session) { |
| |
| } |
| |
| ///////////////////////////////////////////////////////////// |
| // ServerMessageListener |
| ///////////////////////////////////////////////////////////// |
| func (coordinator *DefaultCoordinator) OnTrxMessage(rpcMessage protocal.RpcMessage, session getty.Session) { |
| rpcContext := SessionManager.GetContextFromIdentified(session) |
| logging.Logger.Debugf("server received:%v,clientIp:%s,vgroup:%s",rpcMessage.Body,session.RemoteAddr(),rpcContext.TransactionServiceGroup) |
| |
| warpMessage, isWarpMessage := rpcMessage.Body.(protocal.MergedWarpMessage) |
| if isWarpMessage { |
| resultMessage := protocal.MergeResultMessage{Msgs:make([]protocal.MessageTypeAware,0)} |
| for _,msg := range warpMessage.Msgs { |
| resp := coordinator.handleTrxMessage(msg,*rpcContext) |
| resultMessage.Msgs = append(resultMessage.Msgs,resp) |
| } |
| coordinator.SendResponse(rpcMessage,rpcContext.session,resultMessage) |
| } else { |
| message := rpcMessage.Body.(protocal.MessageTypeAware) |
| resp := coordinator.handleTrxMessage(message,*rpcContext) |
| coordinator.SendResponse(rpcMessage,rpcContext.session,resp) |
| } |
| } |
| |
| func (coordinator *DefaultCoordinator) handleTrxMessage(msg protocal.MessageTypeAware,ctx RpcContext) protocal.MessageTypeAware { |
| switch msg.GetTypeCode() { |
| case protocal.TypeGlobalBegin: |
| req := msg.(protocal.GlobalBeginRequest) |
| resp := coordinator.doGlobalBegin(req,ctx) |
| return resp |
| case protocal.TypeGlobalStatus: |
| req := msg.(protocal.GlobalStatusRequest) |
| resp := coordinator.doGlobalStatus(req,ctx) |
| return resp |
| case protocal.TypeGlobalReport: |
| req := msg.(protocal.GlobalReportRequest) |
| resp := coordinator.doGlobalReport(req,ctx) |
| return resp |
| case protocal.TypeGlobalCommit: |
| req := msg.(protocal.GlobalCommitRequest) |
| resp := coordinator.doGlobalCommit(req,ctx) |
| return resp |
| case protocal.TypeGlobalRollback: |
| req := msg.(protocal.GlobalRollbackRequest) |
| resp := coordinator.doGlobalRollback(req,ctx) |
| return resp |
| case protocal.TypeBranchRegister: |
| req := msg.(protocal.BranchRegisterRequest) |
| resp := coordinator.doBranchRegister(req,ctx) |
| return resp |
| case protocal.TypeBranchStatusReport: |
| req := msg.(protocal.BranchReportRequest) |
| resp := coordinator.doBranchReport(req,ctx) |
| return resp |
| default: |
| return nil |
| } |
| } |
| |
| func (coordinator *DefaultCoordinator) OnRegRmMessage(rpcMessage protocal.RpcMessage, session getty.Session) { |
| message := rpcMessage.Body.(protocal.RegisterRMRequest) |
| |
| //version things |
| SessionManager.RegisterRmGettySession(message,session) |
| logging.Logger.Debugf("checkAuth for client:%s,vgroup:%s,applicationId:%s",session.RemoteAddr(),message.TransactionServiceGroup,message.ApplicationId) |
| |
| coordinator.SendResponse(rpcMessage,session,protocal.RegisterRMResponse{AbstractIdentifyResponse: protocal.AbstractIdentifyResponse{Identified: true}}) |
| } |
| |
| func (coordinator *DefaultCoordinator) OnRegTmMessage(rpcMessage protocal.RpcMessage, session getty.Session) { |
| message := rpcMessage.Body.(protocal.RegisterTMRequest) |
| |
| //version things |
| SessionManager.RegisterTmGettySession(message,session) |
| logging.Logger.Debugf("checkAuth for client:%s,vgroup:%s,applicationId:%s",session.RemoteAddr(),message.TransactionServiceGroup,message.ApplicationId) |
| |
| coordinator.SendResponse(rpcMessage,session,protocal.RegisterTMResponse{AbstractIdentifyResponse: protocal.AbstractIdentifyResponse{Identified: true}}) |
| } |
| |
| func (coordinator *DefaultCoordinator) OnCheckMessage(rpcMessage protocal.RpcMessage, session getty.Session) { |
| coordinator.SendResponse(rpcMessage,session,protocal.HeartBeatMessagePong) |
| logging.Logger.Debugf("received PING from %s", session.RemoteAddr()) |
| } |
| |
| ///////////////////////////////////////////////////////////// |
| // ServerMessageSender |
| ///////////////////////////////////////////////////////////// |
| func (coordinator *DefaultCoordinator) SendResponse(request protocal.RpcMessage, session getty.Session, msg interface{}) { |
| var ss = session |
| _,ok := msg.(protocal.HeartBeatMessage) |
| if !ok { |
| ss = SessionManager.GetSameClientGettySession(session) |
| } |
| if ss != nil { |
| coordinator.defaultSendResponse(request,ss,msg) |
| } |
| } |
| |
| |
| func (coordinator *DefaultCoordinator) SendSyncRequest(resourceId string, clientId string, message interface{}) (interface{},error) { |
| return coordinator.SendSyncRequestWithTimeout(resourceId,clientId,message,RPC_REQUEST_TIMEOUT) |
| } |
| |
| |
| func (coordinator *DefaultCoordinator) SendSyncRequestWithTimeout(resourceId string, clientId string, message interface{}, timeout time.Duration) (interface{},error) { |
| session,err := SessionManager.GetGettySession(resourceId,clientId) |
| if err != nil { |
| return nil, errors.WithStack(err) |
| } |
| return coordinator.sendAsyncRequestWithResponse("",session,message,timeout) |
| } |
| |
| |
| func (coordinator *DefaultCoordinator) SendSyncRequestByGettySession(session getty.Session, message interface{}) (interface{},error) { |
| return coordinator.SendSyncRequestByGettySessionWithTimeout(session,message,RPC_REQUEST_TIMEOUT) |
| } |
| |
| |
| func (coordinator *DefaultCoordinator) SendSyncRequestByGettySessionWithTimeout(session getty.Session, message interface{}, timeout time.Duration) (interface{},error) { |
| if session == nil { |
| return nil,errors.New("rm client is not connected") |
| } |
| return coordinator.sendAsyncRequestWithResponse("",session,message,timeout) |
| } |
| |
| |
| func (coordinator *DefaultCoordinator) SendASyncRequest(session getty.Session, message interface{}) error { |
| return coordinator.sendAsyncRequestWithoutResponse(session,message) |
| } |
| |
| func (coordinator *DefaultCoordinator) sendAsyncRequestWithResponse(address string,session getty.Session,msg interface{},timeout time.Duration) (interface{},error) { |
| if timeout <= time.Duration(0) { |
| return nil,errors.New("timeout should more than 0ms") |
| } |
| return coordinator.sendAsyncRequest(address,session,msg,timeout) |
| } |
| |
| func (coordinator *DefaultCoordinator) sendAsyncRequestWithoutResponse(session getty.Session,msg interface{}) error { |
| _,err := coordinator.sendAsyncRequest("",session,msg,time.Duration(0)) |
| return err |
| } |
| |
| func (coordinator *DefaultCoordinator) sendAsyncRequest(address string,session getty.Session,msg interface{},timeout time.Duration) (interface{},error) { |
| var err error |
| if session == nil { |
| logging.Logger.Warn("sendAsyncRequestWithResponse nothing, caused by null channel.") |
| } |
| rpcMessage := protocal.RpcMessage{ |
| Id: int32(coordinator.idGenerator.Inc()), |
| MessageType: protocal.MSGTYPE_RESQUEST_ONEWAY, |
| Codec: codec.SEATA, |
| Compressor: 0, |
| Body: msg, |
| } |
| resp := NewMessageFuture(rpcMessage) |
| coordinator.futures.Store(rpcMessage.Id, resp) |
| //config timeout |
| err = session.WritePkg(rpcMessage, 60000) |
| if err != nil { |
| coordinator.futures.Delete(rpcMessage.Id) |
| } |
| |
| if timeout > time.Duration(0) { |
| select { |
| case <-getty.GetTimeWheel().After(timeout): |
| coordinator.futures.Delete(rpcMessage.Id) |
| return nil, errors.Errorf("wait response timeout,ip:%s,request:%v", address, rpcMessage) |
| case <-resp.done: |
| err = resp.err |
| } |
| return resp.response, err |
| } |
| return nil,err |
| } |
| |
| func (coordinator *DefaultCoordinator) defaultSendResponse(request protocal.RpcMessage, session getty.Session, msg interface{}) { |
| resp := protocal.RpcMessage{ |
| Id: request.Id, |
| Codec: request.Codec, |
| Compressor: request.Compressor, |
| Body: msg, |
| } |
| _,ok := msg.(protocal.HeartBeatMessage) |
| if ok { |
| resp.MessageType = protocal.MSGTYPE_HEARTBEAT_RESPONSE |
| } else { |
| resp.MessageType = protocal.MSGTYPE_RESPONSE |
| } |
| session.WritePkg(resp,time.Duration(0)) |
| } |
| |
| ///////////////////////////////////////////////////////////// |
| // TCInboundHandler |
| ///////////////////////////////////////////////////////////// |
| func (coordinator *DefaultCoordinator) doGlobalBegin(request protocal.GlobalBeginRequest,ctx RpcContext) protocal.GlobalBeginResponse { |
| var resp = protocal.GlobalBeginResponse{} |
| xid,err := coordinator.core.Begin(ctx.ApplicationId,ctx.TransactionServiceGroup,request.TransactionName,request.Timeout) |
| if err != nil { |
| trxException, ok := err.(meta.TransactionException) |
| resp.ResultCode = protocal.ResultCodeFailed |
| if ok { |
| resp.TransactionExceptionCode = trxException.Code |
| resp.Msg = fmt.Sprintf("TransactionException[%s]",err.Error()) |
| logging.Logger.Errorf("Catch TransactionException while do RPC, request: %v", request) |
| return resp |
| } |
| resp.Msg = fmt.Sprintf("RuntimeException[%s]",err.Error()) |
| logging.Logger.Errorf("Catch RuntimeException while do RPC, request: %v", request) |
| return resp |
| } |
| resp.Xid = xid |
| resp.ResultCode = protocal.ResultCodeSuccess |
| return resp |
| } |
| |
| func (coordinator *DefaultCoordinator) doGlobalStatus(request protocal.GlobalStatusRequest,ctx RpcContext) protocal.GlobalStatusResponse { |
| var resp = protocal.GlobalStatusResponse{} |
| globalStatus,err := coordinator.core.GetStatus(request.Xid) |
| if err != nil { |
| trxException, ok := err.(meta.TransactionException) |
| resp.ResultCode = protocal.ResultCodeFailed |
| if ok { |
| resp.TransactionExceptionCode = trxException.Code |
| resp.Msg = fmt.Sprintf("TransactionException[%s]",err.Error()) |
| logging.Logger.Errorf("Catch TransactionException while do RPC, request: %v", request) |
| return resp |
| } |
| resp.Msg = fmt.Sprintf("RuntimeException[%s]",err.Error()) |
| logging.Logger.Errorf("Catch RuntimeException while do RPC, request: %v", request) |
| return resp |
| } |
| resp.GlobalStatus = globalStatus |
| resp.ResultCode = protocal.ResultCodeSuccess |
| return resp |
| } |
| |
| func (coordinator *DefaultCoordinator) doGlobalReport(request protocal.GlobalReportRequest,ctx RpcContext) protocal.GlobalReportResponse { |
| var resp = protocal.GlobalReportResponse{} |
| globalStatus,err := coordinator.core.GlobalReport(request.Xid,request.GlobalStatus) |
| if err != nil { |
| trxException, ok := err.(meta.TransactionException) |
| resp.ResultCode = protocal.ResultCodeFailed |
| if ok { |
| resp.TransactionExceptionCode = trxException.Code |
| resp.Msg = fmt.Sprintf("TransactionException[%s]",err.Error()) |
| logging.Logger.Errorf("Catch TransactionException while do RPC, request: %v", request) |
| return resp |
| } |
| resp.Msg = fmt.Sprintf("RuntimeException[%s]",err.Error()) |
| logging.Logger.Errorf("Catch RuntimeException while do RPC, request: %v", request) |
| return resp |
| } |
| resp.GlobalStatus = globalStatus |
| resp.ResultCode = protocal.ResultCodeSuccess |
| return resp |
| } |
| |
| |
| func (coordinator *DefaultCoordinator) doGlobalCommit(request protocal.GlobalCommitRequest,ctx RpcContext) protocal.GlobalCommitResponse { |
| var resp = protocal.GlobalCommitResponse{} |
| globalStatus,err := coordinator.core.Commit(request.Xid) |
| if err != nil { |
| trxException, ok := err.(meta.TransactionException) |
| resp.ResultCode = protocal.ResultCodeFailed |
| if ok { |
| resp.TransactionExceptionCode = trxException.Code |
| resp.Msg = fmt.Sprintf("TransactionException[%s]",err.Error()) |
| logging.Logger.Errorf("Catch TransactionException while do RPC, request: %v", request) |
| return resp |
| } |
| resp.Msg = fmt.Sprintf("RuntimeException[%s]",err.Error()) |
| logging.Logger.Errorf("Catch RuntimeException while do RPC, request: %v", request) |
| return resp |
| } |
| resp.GlobalStatus = globalStatus |
| resp.ResultCode = protocal.ResultCodeSuccess |
| return resp |
| } |
| |
| func (coordinator *DefaultCoordinator) doGlobalRollback(request protocal.GlobalRollbackRequest,ctx RpcContext) protocal.GlobalRollbackResponse { |
| var resp = protocal.GlobalRollbackResponse{} |
| globalStatus,err := coordinator.core.Rollback(request.Xid) |
| if err != nil { |
| trxException, ok := err.(meta.TransactionException) |
| resp.ResultCode = protocal.ResultCodeFailed |
| globalSession := holder.GetSessionHolder().FindGlobalSessionWithBranchSessions(request.Xid,false) |
| if globalSession == nil { |
| resp.GlobalStatus = meta.GlobalStatusFinished |
| } else { |
| resp.GlobalStatus = globalSession.Status |
| } |
| if ok { |
| resp.TransactionExceptionCode = trxException.Code |
| resp.Msg = fmt.Sprintf("TransactionException[%s]",err.Error()) |
| logging.Logger.Errorf("Catch TransactionException while do RPC, request: %v", request) |
| return resp |
| } |
| resp.Msg = fmt.Sprintf("RuntimeException[%s]",err.Error()) |
| logging.Logger.Errorf("Catch RuntimeException while do RPC, request: %v", request) |
| return resp |
| } |
| resp.GlobalStatus = globalStatus |
| resp.ResultCode = protocal.ResultCodeSuccess |
| return resp |
| } |
| |
| func (coordinator *DefaultCoordinator) doBranchRegister(request protocal.BranchRegisterRequest,ctx RpcContext) protocal.BranchRegisterResponse { |
| var resp = protocal.BranchRegisterResponse{} |
| branchId,err := coordinator.core.BranchRegister(request.BranchType,request.ResourceId,ctx.ClientId,request.Xid,request.ApplicationData,request.LockKey) |
| if err != nil { |
| trxException, ok := err.(meta.TransactionException) |
| resp.ResultCode = protocal.ResultCodeFailed |
| if ok { |
| resp.TransactionExceptionCode = trxException.Code |
| resp.Msg = fmt.Sprintf("TransactionException[%s]",err.Error()) |
| logging.Logger.Errorf("Catch TransactionException while do RPC, request: %v", request) |
| return resp |
| } |
| resp.Msg = fmt.Sprintf("RuntimeException[%s]",err.Error()) |
| logging.Logger.Errorf("Catch RuntimeException while do RPC, request: %v", request) |
| return resp |
| } |
| resp.BranchId = branchId |
| resp.ResultCode = protocal.ResultCodeSuccess |
| return resp |
| } |
| |
| func (coordinator *DefaultCoordinator) doBranchReport(request protocal.BranchReportRequest,ctx RpcContext) protocal.BranchReportResponse { |
| var resp = protocal.BranchReportResponse{} |
| err := coordinator.core.BranchReport(request.BranchType,request.Xid,request.BranchId,request.Status,request.ApplicationData) |
| if err != nil { |
| trxException, ok := err.(meta.TransactionException) |
| resp.ResultCode = protocal.ResultCodeFailed |
| if ok { |
| resp.TransactionExceptionCode = trxException.Code |
| resp.Msg = fmt.Sprintf("TransactionException[%s]",err.Error()) |
| logging.Logger.Errorf("Catch TransactionException while do RPC, request: %v", request) |
| return resp |
| } |
| resp.Msg = fmt.Sprintf("RuntimeException[%s]",err.Error()) |
| logging.Logger.Errorf("Catch RuntimeException while do RPC, request: %v", request) |
| return resp |
| } |
| resp.ResultCode = protocal.ResultCodeSuccess |
| return resp |
| } |
| |
| func (coordinator *DefaultCoordinator) doLockCheck(request protocal.GlobalLockQueryRequest,ctx RpcContext) protocal.GlobalLockQueryResponse { |
| var resp = protocal.GlobalLockQueryResponse{} |
| result, err := coordinator.core.LockQuery(request.BranchType,request.ResourceId,request.Xid,request.LockKey) |
| if err != nil { |
| trxException, ok := err.(meta.TransactionException) |
| resp.ResultCode = protocal.ResultCodeFailed |
| if ok { |
| resp.TransactionExceptionCode = trxException.Code |
| resp.Msg = fmt.Sprintf("TransactionException[%s]",err.Error()) |
| logging.Logger.Errorf("Catch TransactionException while do RPC, request: %v", request) |
| return resp |
| } |
| resp.Msg = fmt.Sprintf("RuntimeException[%s]",err.Error()) |
| logging.Logger.Errorf("Catch RuntimeException while do RPC, request: %v", request) |
| return resp |
| } |
| resp.Lockable = result |
| resp.ResultCode = protocal.ResultCodeSuccess |
| return resp |
| } |
| |
| func (coordinator *DefaultCoordinator) processTimeoutCheck() { |
| for { |
| <- coordinator.timeoutCheckTicker.C |
| coordinator.timeoutCheck() |
| } |
| } |
| |
| func (coordinator *DefaultCoordinator) processRetryRollbacking() { |
| for { |
| <- coordinator.retryRollbackingTicker.C |
| coordinator.handleRetryRollbacking() |
| } |
| } |
| |
| func (coordinator *DefaultCoordinator) processRetryCommitting() { |
| for { |
| <- coordinator.retryCommittingTicker.C |
| coordinator.handleRetryCommitting() |
| } |
| } |
| |
| func (coordinator *DefaultCoordinator) processAsyncCommitting() { |
| for { |
| <- coordinator.asyncCommittingTicker.C |
| coordinator.handleAsyncCommitting() |
| } |
| } |
| |
| func (coordinator *DefaultCoordinator) processUndoLogDelete() { |
| for { |
| <- coordinator.undoLogDeleteTicker.C |
| coordinator.undoLogDelete() |
| } |
| } |
| |
| func (coordinator *DefaultCoordinator) timeoutCheck() { |
| allSessions := holder.GetSessionHolder().RootSessionManager.AllSessions() |
| if allSessions == nil && len(allSessions) <= 0 { |
| return |
| } |
| logging.Logger.Debugf("Transaction Timeout Check Begin: %d",len(allSessions)) |
| for _,globalSession := range allSessions { |
| logging.Logger.Debugf("%s %s %d %d",globalSession.Xid,globalSession.Status.String(),globalSession.BeginTime,globalSession.Timeout) |
| shouldTimout := func (gs *session.GlobalSession) bool { |
| globalSession.Lock() |
| defer globalSession.Unlock() |
| if globalSession.Status != meta.GlobalStatusBegin || !globalSession.IsTimeout() { |
| return false |
| } |
| |
| if globalSession.Active { |
| globalSession.Active = false |
| } |
| changeGlobalSessionStatus(globalSession, meta.GlobalStatusTimeoutRollbacking) |
| evt := event.NewGlobalTransactionEvent(globalSession.TransactionId, event.RoleTC, globalSession.TransactionName, globalSession.BeginTime, 0, globalSession.Status) |
| event.EventBus.GlobalTransactionEventChannel <- evt |
| return true |
| }(globalSession) |
| if !shouldTimout { |
| continue |
| } |
| logging.Logger.Infof("Global transaction[%s] is timeout and will be rolled back.",globalSession.Status) |
| holder.GetSessionHolder().RetryRollbackingSessionManager.AddGlobalSession(globalSession) |
| } |
| logging.Logger.Debug("Transaction Timeout Check End.") |
| } |
| |
| func (coordinator *DefaultCoordinator) handleRetryRollbacking() { |
| rollbackingSessions := holder.GetSessionHolder().RetryRollbackingSessionManager.AllSessions() |
| if rollbackingSessions == nil && len(rollbackingSessions) <= 0 { |
| return |
| } |
| now := util.CurrentTimeMillis() |
| for _,rollbackingSession := range rollbackingSessions { |
| if rollbackingSession.Status == meta.GlobalStatusRollbacking && !rollbackingSession.IsRollbackingDead() { |
| continue |
| } |
| if isRetryTimeout(int64(now),coordinator.conf.MaxRollbackRetryTimeout,rollbackingSession.BeginTime){ |
| if coordinator.conf.RollbackRetryTimeoutUnlockEnable { |
| lock.GetLockManager().ReleaseGlobalSessionLock(rollbackingSession) |
| } |
| holder.GetSessionHolder().RetryRollbackingSessionManager.RemoveGlobalSession(rollbackingSession) |
| logging.Logger.Errorf("GlobalSession rollback retry timeout and removed [%s]", rollbackingSession.Xid) |
| continue |
| } |
| _, err := coordinator.core.doGlobalRollback(rollbackingSession,true) |
| if err != nil { |
| logging.Logger.Infof("Failed to retry rollbacking [%s]",rollbackingSession.Xid) |
| } |
| } |
| } |
| |
| func isRetryTimeout(now int64,timeout int64,beginTime int64) bool { |
| if timeout >= ALWAYS_RETRY_BOUNDARY && now - beginTime > timeout { |
| return true |
| } |
| return false |
| } |
| |
| func (coordinator *DefaultCoordinator) handleRetryCommitting() { |
| committingSessions := holder.GetSessionHolder().RetryCommittingSessionManager.AllSessions() |
| if committingSessions == nil && len(committingSessions) <= 0 { |
| return |
| } |
| now := util.CurrentTimeMillis() |
| for _,committingSession := range committingSessions { |
| if isRetryTimeout(int64(now),coordinator.conf.MaxCommitRetryTimeout,committingSession.BeginTime) { |
| holder.GetSessionHolder().RetryCommittingSessionManager.RemoveGlobalSession(committingSession) |
| logging.Logger.Errorf("GlobalSession commit retry timeout and removed [%s]", committingSession.Xid) |
| continue |
| } |
| _,err := coordinator.core.doGlobalCommit(committingSession,true) |
| if err != nil { |
| logging.Logger.Infof("Failed to retry committing [%s]",committingSession.Xid) |
| } |
| } |
| } |
| |
| func (coordinator *DefaultCoordinator) handleAsyncCommitting() { |
| asyncCommittingSessions := holder.GetSessionHolder().AsyncCommittingSessionManager.AllSessions() |
| if asyncCommittingSessions == nil && len(asyncCommittingSessions) <= 0 { |
| return |
| } |
| for _,asyncCommittingSession := range asyncCommittingSessions { |
| if asyncCommittingSession.Status != meta.GlobalStatusAsyncCommitting { |
| continue |
| } |
| _,err := coordinator.core.doGlobalCommit(asyncCommittingSession,true) |
| if err != nil { |
| logging.Logger.Infof("Failed to async committing [%s]",asyncCommittingSession.Xid) |
| } |
| } |
| } |
| |
| func (coordinator *DefaultCoordinator) undoLogDelete() { |
| saveDays := coordinator.conf.UndoConfig.LogSaveDays |
| for key,session := range SessionManager.GetRmSessions() { |
| resourceId := key |
| deleteRequest := protocal.UndoLogDeleteRequest{ |
| ResourceId: resourceId, |
| SaveDays: saveDays, |
| } |
| err := coordinator.SendASyncRequest(session,deleteRequest) |
| if err != nil { |
| logging.Logger.Errorf("Failed to async delete undo log resourceId = %s", resourceId) |
| } |
| } |
| } |
| |
| func (coordinator *DefaultCoordinator) Stop() { |
| coordinator.timeoutCheckTicker.Stop() |
| coordinator.retryRollbackingTicker.Stop() |
| coordinator.retryCommittingTicker.Stop() |
| coordinator.asyncCommittingTicker.Stop() |
| coordinator.undoLogDeleteTicker.Stop() |
| } |