blob: f423bd5195df41b2f9189502cba841e996805be3 [file] [log] [blame]
package lock
import (
"github.com/dk-lockdown/seata-golang/logging"
"github.com/dk-lockdown/seata-golang/model"
"github.com/dk-lockdown/seata-golang/tc/session"
"github.com/dk-lockdown/seata-golang/util"
"github.com/pkg/errors"
"strconv"
"sync"
"sync/atomic"
)
const BucketPerTable = 128
type MemoryLocker struct {
LockMap *sync.Map
// 高流量下,锁资源越多,BucketHolder 的性能越下降
BucketHolder *sync.Map
LockKeyCount int64
}
func (ml *MemoryLocker) AcquireLock(branchSession *session.BranchSession) (bool, error) {
if branchSession == nil {
logging.Logger.Errorf("branchSession can't be null for memory/file locker.")
return false, errors.New("branchSession can't be null for memory/file locker.")
}
lockKey := branchSession.LockKey
if lockKey == "" {
return true,nil
}
locks := collectRowLocksByBranchSession(branchSession)
if locks == nil { return true,nil }
return ml.acquireLockByRowLocks(branchSession,locks)
}
func (ml *MemoryLocker) ReleaseLock(branchSession *session.BranchSession) (bool, error) {
if branchSession == nil {
logging.Logger.Info("branchSession can't be null for memory/file locker.")
return false,errors.New("branchSession can't be null for memory/file locker")
}
locks := collectRowLocksByBranchSession(branchSession)
return ml.releaseLockByRowLocks(branchSession,locks)
}
func (ml *MemoryLocker) ReleaseGlobalSessionLock(globalSession *session.GlobalSession) (bool, error) {
branchSessions := globalSession.GetSortedBranches()
releaseLockResult := true
for _,branchSession := range branchSessions {
ok, err := ml.ReleaseLock(branchSession)
if err != nil {
return ok,err
}
if !ok { releaseLockResult = false }
}
return releaseLockResult, nil
}
func (ml *MemoryLocker) IsLockable(xid string, resourceId string, lockKey string) bool {
locks := collectRowLocksByLockKeyResourceIdXid(lockKey, resourceId, xid)
return ml.isLockableByRowLocks(locks)
}
func (ml *MemoryLocker) CleanAllLocks() {
ml.LockMap = &sync.Map{}
ml.BucketHolder = &sync.Map{}
ml.LockKeyCount = 0
}
func (ml *MemoryLocker) GetLockKeyCount() int64 {
return ml.LockKeyCount
}
// AcquireLock 申请锁资源,resourceId -> tableName -> bucketId -> pk -> transactionId
func (ml *MemoryLocker) acquireLockByRowLocks(branchSession *session.BranchSession,rowLocks []*RowLock) (bool, error) {
if rowLocks == nil { return true, nil }
resourceId := branchSession.ResourceId
transactionId := branchSession.TransactionId
dbLockMap,_ := ml.LockMap.LoadOrStore(resourceId,&sync.Map{})
cDbLockMap := dbLockMap.(*sync.Map)
for _, rowLock := range rowLocks {
tableLockMap,_ := cDbLockMap.LoadOrStore(rowLock.TableName,&sync.Map{})
cTableLockMap := tableLockMap.(*sync.Map)
bucketId := util.String(rowLock.Pk) % BucketPerTable
bucketKey := strconv.Itoa(bucketId)
bucketLockMap,_ := cTableLockMap.LoadOrStore(bucketKey,&sync.Map{})
cBucketLockMap := bucketLockMap.(*sync.Map)
previousLockTransactionId,loaded := cBucketLockMap.LoadOrStore(rowLock.Pk, transactionId)
if !loaded {
//No existing rowLock, and now locked by myself
keysInHolder,_ := ml.BucketHolder.LoadOrStore(cBucketLockMap, model.NewSet())
sKeysInHolder := keysInHolder.(*model.Set)
sKeysInHolder.Add(rowLock.Pk)
atomic.AddInt64(&ml.LockKeyCount,1)
} else if previousLockTransactionId == transactionId {
// Locked by me before
continue
} else {
logging.Logger.Infof("Global rowLock on [%s:%s] is holding by %d", rowLock.TableName, rowLock.Pk,previousLockTransactionId)
// branchSession unlock
_,err := ml.ReleaseLock(branchSession)
return false,err
}
}
return true, nil
}
func (ml *MemoryLocker) releaseLockByRowLocks(branchSession *session.BranchSession,rowLocks []*RowLock) (bool,error) {
if rowLocks == nil { return false, nil }
releaseLock := func (key, value interface{}) bool {
cBucketLockMap := key.(*sync.Map)
keys := value.(*model.Set)
for _, key := range keys.List() {
transId, ok := cBucketLockMap.Load(key)
if ok && transId == branchSession.TransactionId {
cBucketLockMap.Delete(key)
// keys.List() 是一个新的 slice,移除 key 并不会导致错误发生
keys.Remove(key)
atomic.AddInt64(&ml.LockKeyCount,-1)
}
}
return true
}
ml.BucketHolder.Range(releaseLock)
return true, nil
}
func (ml *MemoryLocker) isLockableByRowLocks(rowLocks []*RowLock) bool {
if rowLocks == nil { return true }
resourceId := rowLocks[0].ResourceId
transactionId := rowLocks[0].TransactionId
dbLockMap, ok := ml.LockMap.Load(resourceId)
if !ok {
return true
}
cDbLockMap := dbLockMap.(*sync.Map)
for _, rowLock := range rowLocks {
tableLockMap,ok := cDbLockMap.Load(rowLock.TableName)
if !ok {
continue
}
cTableLockMap := tableLockMap.(*sync.Map)
bucketId := util.String(rowLock.Pk) % BucketPerTable
bucketKey := strconv.Itoa(bucketId)
bucketLockMap,ok := cTableLockMap.Load(bucketKey)
if !ok {
continue
}
cBucketLockMap := bucketLockMap.(*sync.Map)
previousLockTransactionId,ok := cBucketLockMap.Load(rowLock.Pk)
if !ok || previousLockTransactionId == transactionId {
// Locked by me before
continue
} else {
logging.Logger.Infof("Global rowLock on [%s:%s] is holding by %d", rowLock.TableName, rowLock.Pk,previousLockTransactionId)
return false
}
}
return true
}