blob: 30fa486eaf68d109d4ac4eea019cea13cb0dc920 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import (
"io/fs"
"os"
"path"
"path/filepath"
"strings"
"sync"
log "github.com/go-chassis/openlog"
)
var FileRootPath = "/data/kvs"
var NewstKVFile = "newest_version.json"
var MutexMap = make(map[string]*sync.RWMutex)
var mutexMapLock = &sync.Mutex{}
var rollbackMutexLock = &sync.Mutex{}
var createDirMutexLock = &sync.Mutex{}
type SchemaDAO struct{}
type FileDoRecord struct {
filepath string
content []byte
}
func GetOrCreateMutex(path string) *sync.RWMutex {
mutexMapLock.Lock()
mutex, ok := MutexMap[path]
if !ok {
mutex = &sync.RWMutex{}
MutexMap[path] = mutex
}
mutexMapLock.Unlock()
return mutex
}
func ExistDir(path string) error {
_, err := os.ReadDir(path)
if err != nil {
// create the dir if not exist
if os.IsNotExist(err) {
createDirMutexLock.Lock()
defer createDirMutexLock.Unlock()
err = os.MkdirAll(path, fs.ModePerm)
if err != nil {
log.Error("failed to make dir: " + path + " " + err.Error())
return err
}
return nil
}
log.Error("failed to read dir: " + path + " " + err.Error())
return err
}
return nil
}
func MoveDir(srcDir string, dstDir string) (err error) {
srcMutex := GetOrCreateMutex(srcDir)
dstMutex := GetOrCreateMutex(dstDir)
srcMutex.Lock()
dstMutex.Lock()
defer srcMutex.Unlock()
defer dstMutex.Unlock()
var movedFiles []string
files, err := os.ReadDir(srcDir)
if err != nil {
log.Error("move schema files failed " + err.Error())
return err
}
for _, file := range files {
err = ExistDir(dstDir)
if err != nil {
log.Error("move schema files failed " + err.Error())
return err
}
srcFile := filepath.Join(srcDir, file.Name())
dstFile := filepath.Join(dstDir, file.Name())
err = os.Rename(srcFile, dstFile)
if err != nil {
log.Error("move schema files failed " + err.Error())
break
}
movedFiles = append(movedFiles, file.Name())
}
if err != nil {
log.Error("Occur error when move schema files, begain rollback... " + err.Error())
for _, fileName := range movedFiles {
srcFile := filepath.Join(srcDir, fileName)
dstFile := filepath.Join(dstDir, fileName)
err = os.Rename(dstFile, srcFile)
if err != nil {
log.Error("rollback move schema files failed and continue" + err.Error())
}
}
}
return err
}
func CreateOrUpdateFile(filepath string, content []byte, rollbackOperations *[]FileDoRecord, isRollback bool) error {
err := ExistDir(path.Dir(filepath))
if !isRollback {
mutex := GetOrCreateMutex(path.Dir(filepath))
mutex.Lock()
defer mutex.Unlock()
}
if err != nil {
log.Error("failed to build new schema file dir " + filepath + ", " + err.Error())
return err
}
fileExist := true
_, err = os.Stat(filepath)
if err != nil {
fileExist = false
}
if fileExist {
oldcontent, err := os.ReadFile(filepath)
if err != nil {
log.Error("failed to read content to file " + filepath + ", " + err.Error())
return err
}
*rollbackOperations = append(*rollbackOperations, FileDoRecord{filepath: filepath, content: oldcontent})
} else {
*rollbackOperations = append(*rollbackOperations, FileDoRecord{filepath: filepath, content: nil})
}
err = os.WriteFile(filepath, content, 0600)
if err != nil {
log.Error("failed to create file " + filepath + ", " + err.Error())
return err
}
return nil
}
func DeleteFile(filepath string, rollbackOperations *[]FileDoRecord) error {
_, err := os.Stat(filepath)
if err != nil {
log.Error("file does not exist when deleting file " + filepath + ", " + err.Error())
return nil
}
oldcontent, err := os.ReadFile(filepath)
if err != nil {
log.Error("failed to read content to file " + filepath + ", " + err.Error())
return err
}
*rollbackOperations = append(*rollbackOperations, FileDoRecord{filepath: filepath, content: oldcontent})
err = os.Remove(filepath)
if err != nil {
log.Error("failed to delete file " + filepath + ", " + err.Error())
return err
}
return nil
}
func CleanDir(dir string) error {
mutex := GetOrCreateMutex(dir)
mutex.Lock()
defer delete(MutexMap, dir)
defer mutex.Unlock()
rollbackOperations := []FileDoRecord{}
_, err := os.Stat(dir)
if err != nil {
return nil
}
files, err := os.ReadDir(dir)
if err != nil {
return nil
}
for _, file := range files {
if file.IsDir() {
continue
}
filepath := filepath.Join(dir, file.Name())
err = DeleteFile(filepath, &rollbackOperations)
if err != nil {
break
}
}
if err != nil {
log.Error("Occur error when create schema files, begain rollback... " + err.Error())
Rollback(rollbackOperations)
return err
}
err = os.Remove(dir)
if err != nil {
log.Error("OOccur error when remove service schema dir, begain rollback... " + err.Error())
Rollback(rollbackOperations)
return err
}
return nil
}
func ReadFile(filepath string) ([]byte, error) {
// check the file is empty
mutex := GetOrCreateMutex(path.Dir(filepath))
mutex.RLocker()
defer mutex.RLocker()
content, err := os.ReadFile(filepath)
if err != nil {
log.Error("failed to read content to file " + filepath + ", " + err.Error())
return nil, err
}
return content, nil
}
func CountInDomain(dir string) (int, error) {
mutex := GetOrCreateMutex(dir)
mutex.RLock()
defer mutex.RUnlock()
files, err := os.ReadDir(dir)
if err != nil {
log.Error("failed to read directory " + dir + ", " + err.Error())
return 0, err
}
count := 0
for _, projectFolder := range files {
if projectFolder.IsDir() {
count++
}
}
// count kv numbers
return count, nil
}
func ReadAllKvsFromProjectFolder(dir string) ([][]byte, error) {
var kvs [][]byte
kvDir, err := os.ReadDir(dir)
if err != nil {
log.Error("failed to read directory " + dir + ", " + err.Error())
return nil, err
}
for _, file := range kvDir {
if file.IsDir() {
filepath := path.Join(dir, file.Name(), NewstKVFile)
content, err := ReadFile(filepath)
if err != nil {
log.Error("failed to read content to file " + filepath + ", " + err.Error())
return nil, err
}
kvs = append(kvs, content)
}
}
return kvs, nil
}
func ReadAllFiles(dir string) ([]string, [][]byte, error) {
mutex := GetOrCreateMutex(dir)
mutex.RLock()
defer mutex.RUnlock()
files := []string{}
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
if !strings.Contains(path, NewstKVFile) {
files = append(files, path)
}
return nil
})
if err != nil {
return nil, nil, err
}
var contentArray [][]byte
for _, file := range files {
content, err := os.ReadFile(file)
if err != nil {
log.Error("failed to read content from schema file " + file + ", " + err.Error())
return nil, nil, err
}
contentArray = append(contentArray, content)
}
return files, contentArray, nil
}
func Rollback(rollbackOperations []FileDoRecord) {
rollbackMutexLock.Lock()
defer rollbackMutexLock.Unlock()
var err error
for _, fileOperation := range rollbackOperations {
if fileOperation.content == nil {
err = DeleteFile(fileOperation.filepath, &[]FileDoRecord{})
} else {
err = CreateOrUpdateFile(fileOperation.filepath, fileOperation.content, &[]FileDoRecord{}, true)
}
if err != nil {
log.Error("Occur error when rolling back schema files: " + err.Error())
}
}
}