blob: 0feb5842dc257b75f254951a6429daf245ff7158 [file] [log] [blame]
package server
import (
"fmt"
"github.com/dk-lockdown/seata-golang/logging"
"github.com/dk-lockdown/seata-golang/meta"
"github.com/dk-lockdown/seata-golang/protocal"
"github.com/dk-lockdown/seata-golang/tc/event"
"github.com/dk-lockdown/seata-golang/tc/holder"
"github.com/dk-lockdown/seata-golang/tc/lock"
"github.com/dk-lockdown/seata-golang/tc/session"
"github.com/dk-lockdown/seata-golang/util"
)
/**
* +--------------------+-----------------------+--------------------+
* | |Method(InBound) |Method(OutBound) |
* +--------------------+-----------------------+--------------------+
* | |Begin | |
* | |BranchRegister | |
* | TC |BranchReport | |
* | (AT&TCC) |(GlobalReport) |branchCommit |
* | (DefaultCore) |Commit |branchRollback |
* | |Rollback | |
* | |GetStatus | |
* +--------------------+-----------------------+--------------------+
* | AT |LockQuery | |
* +--------------------+-----------------------+--------------------+
* | |doGlobalCommit | |
* | SAGA |doGlobalRollBack | |
* | |doGlobalReport | |
* +--------------------+-----------------------+--------------------+
*
* 参考 [effective go 之 Embedding](#https://my.oschina.net/pengfeix/blog/109967)
* Go does not provide the typical, type-driven notion of subclassing,
* but it does have the ability to “borrow” pieces of an implementation
* by embedding types within a struct or interface.
* Go 没有像其它面向对象语言中的类继承概念,但是,它可以通过在结构体或者接口中嵌入
* 其它的类型,来使用被嵌入类型的功能。
*
* 原本 JAVA 版 Seata Sever 设计了 Core 接口,AbstractCore 实现该接口,ATCore、
* TccCore、SagaCore 都继承 AbstractCore。使 ATCore、TccCore、SagaCore 每一
* 个类单独拿出来都是 Core 接口的实现。但 Go 版的 Seata 我不打算这样设计。我们将
* Core 接口里定义的接口方法拿出来,如上面的表格所示,一个全局事务的周期分别对应 Begin、
* BranchRegister、BranchReport、Commit、Rollback 接口方法,这些接口方法适用于
* AT 模式和 TCC 模式(SAGA 模式暂不了解,先不考虑)。AT 模式会多一个 LockQuery
* 的接口。另外 OutBound 方向上有两个接口 branchCommit、branchRollback。JAVA 版
* 的设计中 doGlobalCommit、doGlobalRollBack、doGlobalReport 其实是私有方法,
* 这里用首字母小些开头的方法区分。那么 Go 版本的 DefaultCore 设计就出来了(暂不考虑 SAGA),
* DefaultCore 内嵌入 ATCore。
*
*/
type AbstractCore struct {
MessageSender IServerMessageSender
}
type ATCore struct {
AbstractCore
}
type SAGACore struct {
AbstractCore
}
type DefaultCore struct {
AbstractCore
ATCore
SAGACore
coreMap map[meta.BranchType]interface{}
}
func NewCore(sender IServerMessageSender) ITransactionCoordinator {
return &DefaultCore{
AbstractCore: AbstractCore{ MessageSender: sender },
ATCore: ATCore{},
SAGACore: SAGACore{},
coreMap: make(map[meta.BranchType]interface{}),
}
}
func (core *ATCore) branchSessionLock(globalSession *session.GlobalSession,branchSession *session.BranchSession) error {
result,err :=lock.GetLockManager().AcquireLock(branchSession)
if err != nil {
return err
}
if !result {
return &meta.TransactionException{
Code: meta.TransactionExceptionCodeLockKeyConflict,
Message: fmt.Sprintf("Global lock acquire failed xid = %s branchId = %s",
globalSession.Xid,branchSession.BranchId),
}
}
return nil
}
func (core *ATCore) branchSessionUnlock(branchSession *session.BranchSession) error {
_, err := lock.GetLockManager().ReleaseLock(branchSession)
return err
}
func (core *ATCore) LockQuery(branchType meta.BranchType,
resourceId string,
xid string,
lockKeys string) bool {
return lock.GetLockManager().IsLockable(xid,resourceId,lockKeys)
}
func (core *SAGACore) doGlobalCommit(globalSession *session.GlobalSession, retrying bool) (bool, error) {
return true,nil
}
func (core *SAGACore) doGlobalRollback(globalSession *session.GlobalSession, retrying bool) (bool, error) {
return true,nil
}
func (core *SAGACore) doGlobalReport(globalSession *session.GlobalSession, xid string, param meta.GlobalStatus) error {
return nil
}
func (core *DefaultCore) Begin(applicationId string, transactionServiceGroup string, name string, timeout int32) (string, error) {
gs := session.NewGlobalSession().
SetApplicationId(applicationId).
SetTransactionServiceGroup(transactionServiceGroup).
SetTransactionName(name).
SetTimeout(timeout)
gs.Begin()
err := holder.GetSessionHolder().RootSessionManager.AddGlobalSession(gs)
if err != nil {
return "",err
}
evt := event.NewGlobalTransactionEvent(gs.TransactionId, event.RoleTC,gs.TransactionName,gs.BeginTime,0,gs.Status)
event.EventBus.GlobalTransactionEventChannel <- evt
logging.Logger.Infof("Successfully begin global transaction xid = {}",gs.Xid)
return gs.Xid, nil
}
func (core *DefaultCore) BranchRegister(branchType meta.BranchType,
resourceId string,
clientId string,
xid string,
applicationData []byte,
lockKeys string) (int64, error) {
gs,err := assertGlobalSessionNotNull(xid,false)
if err != nil {
return 0,err
}
defer gs.Unlock()
gs.Lock()
err1 := globalSessionStatusCheck(gs)
if err1 != nil {
return 0,err
}
bs := session.NewBranchSessionByGlobal(*gs,branchType,resourceId,applicationData,lockKeys,clientId)
if branchType == meta.BranchTypeAT {
core.ATCore.branchSessionLock(gs, bs)
}
gs.Add(bs)
logging.Logger.Infof("Successfully register branch xid = %s, branchId = %d",gs.Xid,bs.BranchId)
return bs.BranchId,nil
}
func globalSessionStatusCheck(globalSession *session.GlobalSession) error {
if !globalSession.Active {
return &meta.TransactionException{
Code: meta.TransactionExceptionCodeGlobalTransactionNotActive,
Message: fmt.Sprintf("Could not register branch into global session xid = %s status = %d",globalSession.Xid,globalSession.Status),
}
}
if globalSession.Status != meta.GlobalStatusBegin {
return &meta.TransactionException{
Code: meta.TransactionExceptionCodeGlobalTransactionStatusInvalid,
Message: fmt.Sprintf("Could not register branch into global session xid = %s status = %d while expecting %d",
globalSession.Xid,globalSession.Status,meta.GlobalStatusBegin),
}
}
return nil
}
func assertGlobalSessionNotNull(xid string, withBranchSessions bool) (*session.GlobalSession,error) {
gs := holder.GetSessionHolder().FindGlobalSessionWithBranchSessions(xid,withBranchSessions)
if gs == nil {
logging.Logger.Errorf("Could not found global transaction xid = %s",gs.Xid)
return nil,&meta.TransactionException{
Code: meta.TransactionExceptionCodeGlobalTransactionNotExist,
Message: fmt.Sprintf("Could not found global transaction xid = %s",gs.Xid),
}
}
return gs,nil
}
func (core *DefaultCore) BranchReport(branchType meta.BranchType,
xid string,
branchId int64,
status meta.BranchStatus,
applicationData []byte) error {
gs,err := assertGlobalSessionNotNull(xid,true)
if err != nil {
return nil
}
bs := gs.GetBranch(branchId)
if bs == nil {
return &meta.TransactionException{
Code: meta.TransactionExceptionCodeBranchTransactionNotExist,
Message: fmt.Sprintf("Could not found branch session xid = %s branchId = %d",
xid,branchId),
}
}
bs.Status = status
holder.GetSessionHolder().RootSessionManager.UpdateBranchSessionStatus(bs,status)
logging.Logger.Infof("Successfully branch report xid = %s, branchId = %d",xid,bs.BranchId)
return nil
}
func (core *DefaultCore) LockQuery(branchType meta.BranchType, resourceId string, xid string, lockKeys string) (bool, error) {
return true,nil
}
func (core *DefaultCore) branchCommit(globalSession *session.GlobalSession, branchSession *session.BranchSession) (meta.BranchStatus, error) {
request := protocal.BranchCommitRequest{}
request.Xid = branchSession.Xid
request.BranchId = branchSession.BranchId
request.ResourceId = branchSession.ResourceId
request.ApplicationData = branchSession.ApplicationData
request.BranchType = branchSession.BranchType
resp, err := core.branchCommitSend(request,globalSession,branchSession)
if err != nil {
return 0,&meta.TransactionException{
Code: meta.TransactionExceptionCodeBranchTransactionNotExist,
Message: fmt.Sprintf("Send branch commit failed, xid = %s branchId = %d",
branchSession.Xid,branchSession.BranchId),
}
}
return resp,err
}
func (core *DefaultCore) branchCommitSend(request protocal.BranchCommitRequest,
globalSession *session.GlobalSession, branchSession *session.BranchSession) (meta.BranchStatus,error) {
resp,err := core.MessageSender.SendSyncRequest(branchSession.ResourceId,branchSession.ClientId,request)
if err != nil {
return 0,err
}
response := resp.(protocal.BranchCommitResponse)
return response.BranchStatus, nil
}
func (core *DefaultCore) branchRollback(globalSession *session.GlobalSession, branchSession *session.BranchSession) (meta.BranchStatus, error) {
request := protocal.BranchRollbackRequest{}
request.Xid = branchSession.Xid
request.BranchId = branchSession.BranchId
request.ResourceId = branchSession.ResourceId
request.ApplicationData = branchSession.ApplicationData
request.BranchType = branchSession.BranchType
resp, err := core.branchRollbackSend(request,globalSession,branchSession)
if err != nil {
return 0,&meta.TransactionException{
Code: meta.TransactionExceptionCodeBranchTransactionNotExist,
Message: fmt.Sprintf("Send branch rollback failed, xid = %s branchId = %d",
branchSession.Xid,branchSession.BranchId),
}
}
return resp,err
}
func (core *DefaultCore) branchRollbackSend(request protocal.BranchRollbackRequest,
globalSession *session.GlobalSession, branchSession *session.BranchSession) (meta.BranchStatus,error) {
resp,err := core.MessageSender.SendSyncRequest(branchSession.ResourceId,branchSession.ClientId,request)
if err != nil {
return 0,err
}
response := resp.(protocal.BranchRollbackResponse)
return response.BranchStatus, nil
}
func (core *DefaultCore) Commit(xid string) (meta.GlobalStatus, error) {
globalSession := holder.GetSessionHolder().RootSessionManager.FindGlobalSession(xid)
if globalSession == nil {
return meta.GlobalStatusFinished, nil
}
shouldCommit := func (gs *session.GlobalSession) bool {
gs.Lock()
defer gs.Unlock()
if gs.Active {
gs.Active = false
}
lock.GetLockManager().ReleaseGlobalSessionLock(gs)
if gs.Status == meta.GlobalStatusBegin {
changeGlobalSessionStatus(gs,meta.GlobalStatusCommitting)
return true
}
return false
}(globalSession)
if !shouldCommit {
return globalSession.Status,nil
}
if globalSession.CanBeCommittedAsync() {
asyncCommit(globalSession)
return meta.GlobalStatusCommitted, nil
} else {
_,err := core.doGlobalCommit(globalSession,false)
if err != nil {
return 0,err
}
}
return globalSession.Status,nil
}
func (core *DefaultCore) doGlobalCommit(globalSession *session.GlobalSession, retrying bool) (bool, error) {
var (
success = true
err error
)
evt := event.NewGlobalTransactionEvent(globalSession.TransactionId, event.RoleTC,globalSession.TransactionName,globalSession.BeginTime,0,globalSession.Status)
event.EventBus.GlobalTransactionEventChannel <- evt
if globalSession.IsSaga() {
success,err = core.SAGACore.doGlobalCommit(globalSession,retrying)
} else {
for _,bs := range globalSession.GetSortedBranches() {
if bs.Status == meta.BranchStatusPhaseoneFailed {
removeBranchSession(globalSession,bs)
continue
}
branchStatus,err1 := core.branchCommit(globalSession,bs)
if err1 != nil {
logging.Logger.Errorf("Exception committing branch %v", bs)
if !retrying {
queueToRetryCommit(globalSession)
}
return false,err1
}
switch branchStatus {
case meta.BranchStatusPhasetwoCommitted:
removeBranchSession(globalSession,bs)
continue
case meta.BranchStatusPhasetwoCommitFailedUnretryable:
{
// 二阶段提交失败且不能 Retry,不能异步提交,则移除 GlobalSession,Why?
if globalSession.CanBeCommittedAsync() {
logging.Logger.Errorf("By [%s], failed to commit branch %v",bs.Status.String(),bs)
continue
} else {
endCommitFailed(globalSession)
logging.Logger.Errorf("Finally, failed to commit global[%d] since branch[%d] commit failed",globalSession.Xid,bs.BranchId)
return false,nil
}
}
default:
{
if !retrying {
queueToRetryCommit(globalSession)
return false,nil
}
if globalSession.CanBeCommittedAsync() {
logging.Logger.Errorf("By [%s], failed to commit branch %v",bs.Status.String(),bs)
continue
} else {
logging.Logger.Errorf("ResultCodeFailed to commit global[%d] since branch[%d] commit failed, will retry later.",globalSession.Xid,bs.BranchId)
return false,nil
}
}
}
}
if globalSession.HasBranch() {
logging.Logger.Infof("Global[%d] committing is NOT done.", globalSession.Xid)
return false,nil
}
}
if success {
endCommitted(globalSession)
evt := event.NewGlobalTransactionEvent(globalSession.TransactionId, event.RoleTC,globalSession.TransactionName,globalSession.BeginTime,
int64(util.CurrentTimeMillis()),globalSession.Status)
event.EventBus.GlobalTransactionEventChannel <- evt
logging.Logger.Infof("Global[%d] committing is successfully done.", globalSession.Xid)
}
return success,err
}
func (core *DefaultCore) Rollback(xid string) (meta.GlobalStatus, error) {
globalSession := holder.GetSessionHolder().RootSessionManager.FindGlobalSession(xid)
if globalSession == nil {
return meta.GlobalStatusFinished, nil
}
shouldRollBack := func (gs *session.GlobalSession) bool {
gs.Lock()
defer gs.Unlock()
if gs.Active {
gs.Active = false // Highlight: Firstly, close the session, then no more branch can be registered.
}
lock.GetLockManager().ReleaseGlobalSessionLock(gs)
if gs.Status == meta.GlobalStatusBegin {
changeGlobalSessionStatus(gs,meta.GlobalStatusRollbacking)
return true
}
return false
}(globalSession)
if !shouldRollBack {
return globalSession.Status,nil
}
core.doGlobalRollback(globalSession,false)
return globalSession.Status,nil
}
func (core *DefaultCore) doGlobalRollback(globalSession *session.GlobalSession, retrying bool) (bool, error) {
var (
success = true
err error
)
evt := event.NewGlobalTransactionEvent(globalSession.TransactionId, event.RoleTC,globalSession.TransactionName,globalSession.BeginTime, 0,globalSession.Status)
event.EventBus.GlobalTransactionEventChannel <- evt
if globalSession.IsSaga() {
success,err = core.SAGACore.doGlobalRollback(globalSession,retrying)
} else {
for _,bs := range globalSession.GetSortedBranches() {
if bs.Status == meta.BranchStatusPhaseoneFailed {
removeBranchSession(globalSession, bs)
continue
}
branchStatus,err1 := core.branchRollback(globalSession,bs)
if err1 != nil {
logging.Logger.Errorf("Exception rollbacking branch xid=%d branchId=%d", globalSession.Xid,bs.BranchId)
if !retrying {
queueToRetryRollback(globalSession)
}
return false,err1
}
switch branchStatus {
case meta.BranchStatusPhasetwoRollbacked:
removeBranchSession(globalSession,bs)
logging.Logger.Infof("Successfully rollback branch xid=%d branchId=%d", globalSession.Xid,bs.BranchId)
continue
case meta.BranchStatusPhasetwoRollbackFailedUnretryable:
endRollBackFailed(globalSession)
logging.Logger.Infof("ResultCodeFailed to rollback branch and stop retry xid=%d branchId=%d",globalSession.Xid,bs.BranchId)
return false,nil
default:
logging.Logger.Infof("ResultCodeFailed to rollback branch xid=%d branchId=%d", globalSession.Xid,bs.BranchId)
if !retrying {
queueToRetryRollback(globalSession)
}
return false,nil
}
}
// In db mode, there is a problem of inconsistent data in multiple copies, resulting in new branch
// transaction registration when rolling back.
// 1. New branch transaction and rollback branch transaction have no data association
// 2. New branch transaction has data association with rollback branch transaction
// The second query can solve the first problem, and if it is the second problem, it may cause a rollback
// failure due to data changes.
gs := holder.GetSessionHolder().RootSessionManager.FindGlobalSession(globalSession.Xid)
if gs != nil && gs.HasBranch() {
logging.Logger.Infof("Global[%d] rollbacking is NOT done.", globalSession.Xid)
return false, nil
}
}
if success {
endRollbacked(globalSession)
evt := event.NewGlobalTransactionEvent(globalSession.TransactionId, event.RoleTC,globalSession.TransactionName,globalSession.BeginTime,
int64(util.CurrentTimeMillis()),globalSession.Status)
event.EventBus.GlobalTransactionEventChannel <- evt
logging.Logger.Infof("Successfully rollback global, xid = %d", globalSession.Xid)
}
return success,err
}
func (core *DefaultCore) GetStatus(xid string) (meta.GlobalStatus, error) {
gs := holder.GetSessionHolder().RootSessionManager.FindGlobalSession(xid)
if gs == nil {
return meta.GlobalStatusFinished,nil
} else {
return gs.Status,nil
}
}
func (core *DefaultCore) GlobalReport(xid string, globalStatus meta.GlobalStatus) (meta.GlobalStatus, error) {
gs := holder.GetSessionHolder().RootSessionManager.FindGlobalSession(xid)
if gs == nil {
return globalStatus,nil
}
core.doGlobalReport(gs,xid,globalStatus)
return gs.Status,nil
}
func (core *DefaultCore) doGlobalReport(globalSession *session.GlobalSession,xid string,globalStatus meta.GlobalStatus) error {
if globalSession.IsSaga() {
return core.SAGACore.doGlobalReport(globalSession,xid,globalStatus)
}
return nil
}
func endRollbacked(globalSession *session.GlobalSession) {
if isTimeoutGlobalStatus(globalSession.Status) {
changeGlobalSessionStatus(globalSession,meta.GlobalStatusTimeoutRollbacked)
} else {
changeGlobalSessionStatus(globalSession,meta.GlobalStatusRollbacked)
}
lock.GetLockManager().ReleaseGlobalSessionLock(globalSession)
holder.GetSessionHolder().RootSessionManager.RemoveGlobalSession(globalSession)
}
func endRollBackFailed(globalSession *session.GlobalSession) {
if isTimeoutGlobalStatus(globalSession.Status) {
changeGlobalSessionStatus(globalSession,meta.GlobalStatusTimeoutRollbackFailed)
} else {
changeGlobalSessionStatus(globalSession,meta.GlobalStatusRollbackFailed)
}
lock.GetLockManager().ReleaseGlobalSessionLock(globalSession)
holder.GetSessionHolder().RootSessionManager.RemoveGlobalSession(globalSession)
}
func queueToRetryRollback(globalSession *session.GlobalSession) {
holder.GetSessionHolder().RetryRollbackingSessionManager.AddGlobalSession(globalSession)
if isTimeoutGlobalStatus(globalSession.Status) {
changeGlobalSessionStatus(globalSession,meta.GlobalStatusTimeoutRollbackRetrying)
} else {
changeGlobalSessionStatus(globalSession,meta.GlobalStatusRollbackRetrying)
}
}
func isTimeoutGlobalStatus(status meta.GlobalStatus) bool {
return status == meta.GlobalStatusTimeoutRollbacked ||
status == meta.GlobalStatusTimeoutRollbackFailed ||
status == meta.GlobalStatusTimeoutRollbacking ||
status == meta.GlobalStatusTimeoutRollbackRetrying
}
func endCommitted(globalSession *session.GlobalSession) {
changeGlobalSessionStatus(globalSession,meta.GlobalStatusCommitted)
lock.GetLockManager().ReleaseGlobalSessionLock(globalSession)
holder.GetSessionHolder().RootSessionManager.RemoveGlobalSession(globalSession)
}
func queueToRetryCommit(globalSession *session.GlobalSession) {
holder.GetSessionHolder().RetryCommittingSessionManager.AddGlobalSession(globalSession)
changeGlobalSessionStatus(globalSession,meta.GlobalStatusCommitRetrying)
}
func endCommitFailed(globalSession *session.GlobalSession) {
changeGlobalSessionStatus(globalSession,meta.GlobalStatusCommitFailed)
lock.GetLockManager().ReleaseGlobalSessionLock(globalSession)
holder.GetSessionHolder().RootSessionManager.RemoveGlobalSession(globalSession)
}
func asyncCommit(globalSession *session.GlobalSession) {
holder.GetSessionHolder().AsyncCommittingSessionManager.AddGlobalSession(globalSession)
changeGlobalSessionStatus(globalSession,meta.GlobalStatusAsyncCommitting)
}
func changeGlobalSessionStatus(globalSession *session.GlobalSession, status meta.GlobalStatus) {
globalSession.Status = status
holder.GetSessionHolder().RootSessionManager.UpdateGlobalSessionStatus(globalSession,meta.GlobalStatusAsyncCommitting)
}
func removeBranchSession(globalSession *session.GlobalSession,branchSession *session.BranchSession) {
lock.GetLockManager().ReleaseLock(branchSession)
globalSession.Remove(branchSession)
holder.GetSessionHolder().RootSessionManager.RemoveBranchSession(globalSession,branchSession)
}