| package holder |
| |
| import ( |
| "github.com/dk-lockdown/seata-golang/logging" |
| "github.com/dk-lockdown/seata-golang/meta" |
| "github.com/dk-lockdown/seata-golang/tc/config" |
| "github.com/dk-lockdown/seata-golang/tc/session" |
| "github.com/dk-lockdown/seata-golang/util" |
| ) |
| |
| type Reloadable interface { |
| /** |
| * Reload states. |
| */ |
| Reload() |
| } |
| |
| type FileBasedSessionManager struct { |
| conf config.FileStoreConfig |
| DefaultSessionManager |
| } |
| |
| func NewFileBasedSessionManager(conf config.FileStoreConfig) ISessionManager { |
| transactionStoreManager := &FileTransactionStoreManager{} |
| transactionStoreManager.InitFile(conf.FileDir) |
| sessionManager := DefaultSessionManager{ |
| AbstractSessionManager: AbstractSessionManager{ |
| TransactionStoreManager: transactionStoreManager, |
| Name: conf.FileDir, |
| }, |
| SessionMap: make(map[string]*session.GlobalSession), |
| } |
| transactionStoreManager.SessionManager = &sessionManager |
| return &FileBasedSessionManager{ |
| conf: conf, |
| DefaultSessionManager: sessionManager, |
| } |
| } |
| |
| func (sessionManager *FileBasedSessionManager) Reload() { |
| sessionManager.restoreSessions() |
| sessionManager.washSessions() |
| } |
| |
| func (sessionManager *FileBasedSessionManager) restoreSessions() { |
| unhandledBranchBuffer := make(map[int64]*session.BranchSession) |
| sessionManager.restoreSessionsToUnhandledBranchBuffer(true,unhandledBranchBuffer) |
| sessionManager.restoreSessionsToUnhandledBranchBuffer(false,unhandledBranchBuffer) |
| if len(unhandledBranchBuffer) > 0 { |
| for _,branchSession := range unhandledBranchBuffer { |
| found := sessionManager.SessionMap[branchSession.Xid] |
| if found == nil { |
| logging.Logger.Infof("GlobalSession Does Not Exists For BranchSession [%d/%s]",branchSession.BranchId,branchSession.Xid) |
| } else { |
| existingBranch := found.GetBranch(branchSession.BranchId) |
| if existingBranch == nil { |
| found.Add(branchSession) |
| } else { |
| existingBranch.Status = branchSession.Status |
| } |
| } |
| } |
| } |
| } |
| |
| func (sessionManager *FileBasedSessionManager) restoreSessionsToUnhandledBranchBuffer(isHistory bool,unhandledBranchSessions map[int64]*session.BranchSession) { |
| transactionStoreManager, ok := sessionManager.TransactionStoreManager.(ReloadableStore) |
| if !ok { |
| return |
| } |
| for { |
| if transactionStoreManager.HasRemaining(isHistory) { |
| stores := transactionStoreManager.ReadWriteStore(sessionManager.conf.SessionReloadReadSize,isHistory) |
| sessionManager.restore(stores,unhandledBranchSessions) |
| } else { |
| break |
| } |
| } |
| } |
| |
| func (sessionManager *FileBasedSessionManager) washSessions() { |
| if len(sessionManager.SessionMap) > 0 { |
| for _,globalSession := range sessionManager.SessionMap { |
| switch globalSession.Status { |
| case meta.GlobalStatusUnknown: |
| case meta.GlobalStatusCommitted: |
| case meta.GlobalStatusCommitFailed: |
| case meta.GlobalStatusRollbacked: |
| case meta.GlobalStatusRollbackFailed: |
| case meta.GlobalStatusTimeoutRollbacked: |
| case meta.GlobalStatusTimeoutRollbackFailed: |
| case meta.GlobalStatusFinished: |
| // Remove all sessions finished |
| delete(sessionManager.SessionMap, globalSession.Xid) |
| break |
| default: |
| break |
| } |
| } |
| } |
| } |
| |
| func (sessionManager *FileBasedSessionManager) restore(stores []*TransactionWriteStore, unhandledBranchSessions map[int64]*session.BranchSession) { |
| maxRecoverId := util.UUID |
| for _,store := range stores { |
| logOperation := store.LogOperation |
| sessionStorable := store.SessionRequest |
| maxRecoverId = getMaxId(maxRecoverId, sessionStorable) |
| switch logOperation { |
| case LogOperationGlobalAdd: |
| case LogOperationGlobalUpdate: |
| { |
| globalSession := sessionStorable.(*session.GlobalSession) |
| if globalSession.TransactionId == int64(0) { |
| logging.Logger.Errorf("Restore globalSession from file failed, the transactionId is zero , xid:%s", globalSession.Xid) |
| break |
| } |
| foundGlobalSession := sessionManager.SessionMap[globalSession.Xid] |
| if foundGlobalSession == nil { |
| sessionManager.SessionMap[globalSession.Xid] = globalSession |
| } else { |
| foundGlobalSession.Status = globalSession.Status |
| } |
| break |
| } |
| case LogOperationGlobalRemove: |
| { |
| globalSession := sessionStorable.(*session.GlobalSession) |
| if globalSession.TransactionId == int64(0) { |
| logging.Logger.Errorf("Restore globalSession from file failed, the transactionId is zero , xid:%s", globalSession.Xid) |
| break |
| } |
| delete(sessionManager.SessionMap, globalSession.Xid) |
| break |
| } |
| case LogOperationBranchAdd: |
| case LogOperationBranchUpdate: |
| { |
| branchSession := sessionStorable.(*session.BranchSession) |
| if branchSession.TransactionId == int64(0) { |
| logging.Logger.Errorf("Restore branchSession from file failed, the transactionId is zero , xid:%s", branchSession.Xid) |
| break |
| } |
| foundGlobalSession := sessionManager.SessionMap[branchSession.Xid] |
| if foundGlobalSession == nil { |
| unhandledBranchSessions[branchSession.BranchId] = branchSession |
| } else { |
| existingBranch := foundGlobalSession.GetBranch(branchSession.BranchId) |
| if existingBranch == nil { |
| foundGlobalSession.Add(branchSession) |
| } else { |
| existingBranch.Status = branchSession.Status |
| } |
| } |
| break |
| } |
| case LogOperationBranchRemove: |
| { |
| branchSession := sessionStorable.(*session.BranchSession) |
| if branchSession.TransactionId == int64(0) { |
| logging.Logger.Errorf("Restore branchSession from file failed, the transactionId is zero , xid:%s", branchSession.Xid) |
| break |
| } |
| foundGlobalSession := sessionManager.SessionMap[branchSession.Xid] |
| if foundGlobalSession == nil { |
| logging.Logger.Infof("GlobalSession To Be Updated (Remove Branch) Does Not Exists [%d/%s]",branchSession.BranchId,branchSession.Xid) |
| } else { |
| existingBranch := foundGlobalSession.GetBranch(branchSession.BranchId) |
| if existingBranch == nil { |
| logging.Logger.Infof("BranchSession To Be Updated Does Not Exists [%d/%s]",branchSession.BranchId,branchSession.Xid) |
| } else { |
| foundGlobalSession.Remove(existingBranch) |
| } |
| } |
| break |
| } |
| default: |
| break |
| } |
| } |
| setMaxId(maxRecoverId) |
| } |
| |
| func getMaxId(maxRecoverId int64, sessionStorable session.SessionStorable) int64 { |
| var currentId int64 = 0 |
| var gs, ok1 = sessionStorable.(*session.GlobalSession) |
| if ok1 { |
| currentId = gs.TransactionId |
| } |
| |
| var bs, ok2 = sessionStorable.(*session.BranchSession) |
| if ok2 { |
| currentId = bs.BranchId |
| } |
| |
| if maxRecoverId > currentId { |
| return maxRecoverId |
| } else { |
| return currentId |
| } |
| } |
| |
| func setMaxId(maxRecoverId int64) { |
| var currentId int64 |
| // will be recover multi-thread later |
| for{ |
| currentId = util.UUID |
| if currentId < maxRecoverId { |
| if util.SetUUID(currentId,maxRecoverId) { |
| break |
| } |
| } |
| break |
| } |
| } |