blob: 9f9a69f5c54af411cd398a2ebf48042aef9c552f [file] [log] [blame]
package holder
import (
"os"
"github.com/dk-lockdown/seata-golang/logging"
"github.com/dk-lockdown/seata-golang/tc/model"
"github.com/dk-lockdown/seata-golang/tc/session"
"github.com/dk-lockdown/seata-golang/util"
"strings"
"sync"
"sync/atomic"
"vimagination.zapto.org/byteio"
)
var FileTrxNum int64 = 0
var PerFileBlockSize int64 = 65535 * 8
var HisDataFilenamePostfix = ".1"
var MaxTrxTimeoutMills int64 = 30 * 60 * 1000
type ReloadableStore interface {
/**
* Read write holder.
*
* @param readSize the read size
* @param isHistory the is history
* @return the list
*/
ReadWriteStore(readSize int, isHistory bool) []*TransactionWriteStore
/**
* Has remaining boolean.
*
* @param isHistory the is history
* @return the boolean
*/
HasRemaining(isHistory bool) bool
}
type FileTransactionStoreManager struct {
SessionManager ISessionManager
currFullFileName string
hisFullFileName string
currFileChannel *os.File
LastModifiedTime int64
TrxStartTimeMills int64
sync.Mutex
recoverHisOffset int64
recoverCurrOffset int64
}
func (storeManager * FileTransactionStoreManager) InitFile(fullFileName string) {
storeManager.currFullFileName = fullFileName
storeManager.hisFullFileName = fullFileName + HisDataFilenamePostfix
storeManager.currFileChannel,_ = os.OpenFile(fullFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0777)
storeManager.TrxStartTimeMills = int64(util.CurrentTimeMillis())
}
func (storeManager * FileTransactionStoreManager) writeDataFrame(data []byte) {
dataLength := uint32(len(data))
dataLengthBytes := [4]byte{
byte(dataLength >> 24),
byte(dataLength >> 16),
byte(dataLength >> 8),
byte(dataLength),
}
storeManager.currFileChannel.Write(dataLengthBytes[:4])
storeManager.currFileChannel.Write(data)
}
func (storeManager * FileTransactionStoreManager) WriteSession(logOperation LogOperation, session session.SessionStorable) bool {
storeManager.Lock()
defer storeManager.Unlock()
var curFileTrxNum int64 = 0
data, err := (&TransactionWriteStore{
SessionRequest: session,
LogOperation: logOperation,
}).Encode()
if err != nil {
logging.Logger.Info(err.Error())
return false
}
storeManager.writeDataFrame(data)
storeManager.LastModifiedTime = int64(util.CurrentTimeMillis())
curFileTrxNum = atomic.AddInt64(&FileTrxNum,1)
if curFileTrxNum %PerFileBlockSize == 0 &&
int64(util.CurrentTimeMillis()) - storeManager.TrxStartTimeMills > MaxTrxTimeoutMills {
storeManager.saveHistory()
}
return true
}
func (storeManager * FileTransactionStoreManager) ReadSession(xid string) *session.GlobalSession {
return nil
}
func (storeManager * FileTransactionStoreManager) ReadSessionWithBranchSessions(xid string, withBranchSessions bool) *session.GlobalSession {
return nil
}
func (storeManager * FileTransactionStoreManager) ReadSessionWithSessionCondition(sessionCondition model.SessionCondition) []*session.GlobalSession {
return nil
}
func (storeManager * FileTransactionStoreManager) Shutdown() {
storeManager.currFileChannel.Close()
}
func (storeManager * FileTransactionStoreManager) GetCurrentMaxSessionId() int64{
return int64(0)
}
func (storeManager * FileTransactionStoreManager) saveHistory() {
storeManager.findTimeoutAndSave()
os.Rename(storeManager.currFullFileName,storeManager.hisFullFileName)
storeManager.InitFile(storeManager.currFullFileName)
}
func (storeManager * FileTransactionStoreManager) findTimeoutAndSave() (bool,error) {
globalSessionsOverMaxTimeout := storeManager.SessionManager.FindGlobalSessions(model.SessionCondition{OverTimeAliveMills: MaxTrxTimeoutMills})
if globalSessionsOverMaxTimeout == nil {
return true,nil
}
for _, globalSession := range globalSessionsOverMaxTimeout {
globalWriteStore := &TransactionWriteStore{
SessionRequest: globalSession,
LogOperation: LogOperationGlobalAdd,
}
data,err := globalWriteStore.Encode()
if err != nil {
return false,err
}
storeManager.writeDataFrame(data)
branchSessIonsOverMaXTimeout := globalSession.GetSortedBranches()
if len(branchSessIonsOverMaXTimeout) > 0 {
for _,branchSession := range branchSessIonsOverMaXTimeout {
branchWriteStore := &TransactionWriteStore{
SessionRequest: branchSession,
LogOperation: LogOperationBranchAdd,
}
data,err := branchWriteStore.Encode()
if err != nil {
return false,err
}
storeManager.writeDataFrame(data)
}
}
}
return true,nil
}
func (storeManager * FileTransactionStoreManager) ReadWriteStore(readSize int, isHistory bool) []*TransactionWriteStore {
var (
file *os.File
currentOffset int64
)
if isHistory {
file, _ = os.OpenFile(storeManager.hisFullFileName, os.O_RDWR|os.O_CREATE, 0777)
currentOffset = storeManager.recoverHisOffset
} else {
file, _ = os.OpenFile(storeManager.currFullFileName, os.O_RDWR|os.O_CREATE, 0777)
currentOffset = storeManager.recoverCurrOffset
}
return storeManager.parseDataFile(file,readSize,currentOffset)
}
func (storeManager * FileTransactionStoreManager) HasRemaining(isHistory bool) bool {
var (
file *os.File
currentOffset int64
)
if isHistory {
file, _ = os.OpenFile(storeManager.hisFullFileName, os.O_RDWR|os.O_CREATE, 0777)
currentOffset = storeManager.recoverHisOffset
} else {
file, _ = os.OpenFile(storeManager.currFullFileName, os.O_RDWR|os.O_CREATE, 0777)
currentOffset = storeManager.recoverCurrOffset
}
defer file.Close()
fi,_ := file.Stat()
return currentOffset < fi.Size()
}
func (storeManager * FileTransactionStoreManager) parseDataFile(file *os.File, readSize int, currentOffset int64) []*TransactionWriteStore {
defer file.Close()
var result = make([]*TransactionWriteStore,0)
fi,_ := file.Stat()
fileSize := fi.Size()
reader := byteio.BigEndianReader{Reader:file}
offset := currentOffset
for {
if offset >= fileSize {
break
}
file.Seek(offset, 0)
dataLength, n, _ := reader.ReadUint32()
if n < 4 {
break
}
data := make([]byte, int(dataLength))
length, _ := reader.Read(data)
offset += int64(length + 4)
if length == int(dataLength) {
st := &TransactionWriteStore{}
st.Decode(data)
result = append(result, st)
if len(result) == readSize {
break
}
} else if length == 0 {
break
}
}
if isHisFile(fi.Name()) {
storeManager.recoverHisOffset = offset
} else {
storeManager.recoverCurrOffset = offset
}
return result
}
func isHisFile(path string) bool {
return strings.HasSuffix(path,HisDataFilenamePostfix)
}