| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package cmd |
| |
| import ( |
| "fmt" |
| "os" |
| "time" |
| |
| "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg" |
| "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model" |
| "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/xerr" |
| "github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/logging" |
| "github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/prettyoutput" |
| "github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/promptutil" |
| "github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/timeutil" |
| |
| "github.com/google/uuid" |
| "github.com/jedib0t/go-pretty/v6/progress" |
| "github.com/jedib0t/go-pretty/v6/table" |
| "github.com/spf13/cobra" |
| "github.com/spf13/pflag" |
| "golang.org/x/sync/errgroup" |
| ) |
| |
| const ( |
| // defaultInstance is used to set backup instance name in openGauss, we can modify it in the future. |
| defaultInstance = "ins-default-ss" |
| // defaultShowDetailRetryTimes retry times of check backup detail from agent server |
| defaultShowDetailRetryTimes = 3 |
| |
| backupPromptFmt = "Please Check All Nodes Disk Space, Make Sure Have Enough Space To Backup Or Restore Data.\n" + |
| "Are you sure to continue? (Y/N)" |
| ) |
| |
| var filename string |
| |
| var BackupCmd = &cobra.Command{ |
| Use: "backup", |
| Short: "Backup a database cluster", |
| Run: func(cmd *cobra.Command, args []string) { |
| cmd.Flags().VisitAll(func(flag *pflag.Flag) { |
| fmt.Printf("Flag: %s Value: %s\n", flag.Name, flag.Value) |
| }) |
| |
| // convert BackupModeStr to BackupMode |
| switch BackupModeStr { |
| case "FULL", "full": |
| BackupMode = model.DBBackModeFull |
| case "PTRACK", "ptrack": |
| BackupMode = model.DBBackModePTrack |
| } |
| if BackupMode == model.DBBackModePTrack { |
| logging.Warn("Please make sure all openGauss nodes have been set correct configuration about ptrack. You can refer to https://support.huaweicloud.com/intl/zh-cn/devg-opengauss/opengauss_devg_1362.html for more details.") |
| } |
| |
| logging.Info(fmt.Sprintf("Default backup path: %s", pkg.DefaultRootDir())) |
| |
| // Start backup |
| if err := backup(); err != nil { |
| logging.Error(err.Error()) |
| } |
| }, |
| } |
| |
| func init() { |
| RootCmd.AddCommand(BackupCmd) |
| |
| BackupCmd.Flags().StringVarP(&Host, "host", "H", "", "ss-proxy hostname or ip") |
| _ = BackupCmd.MarkFlagRequired("host") |
| BackupCmd.Flags().Uint16VarP(&Port, "port", "P", 0, "ss-proxy port") |
| _ = BackupCmd.MarkFlagRequired("port") |
| BackupCmd.Flags().StringVarP(&Username, "username", "u", "", "ss-proxy username") |
| _ = BackupCmd.MarkFlagRequired("username") |
| BackupCmd.Flags().StringVarP(&Password, "password", "p", "", "ss-proxy password") |
| _ = BackupCmd.MarkFlagRequired("password") |
| BackupCmd.Flags().StringVarP(&BackupPath, "dn-backup-path", "B", "", "openGauss data backup path") |
| _ = BackupCmd.MarkFlagRequired("dn-backup-path") |
| BackupCmd.Flags().StringVarP(&BackupModeStr, "dn-backup-mode", "b", "", "openGauss data backup mode (FULL|PTRACK)") |
| _ = BackupCmd.MarkFlagRequired("dn-backup-mode") |
| BackupCmd.Flags().Uint8VarP(&ThreadsNum, "dn-threads-num", "j", 1, "openGauss data backup threads nums") |
| BackupCmd.Flags().Uint16VarP(&AgentPort, "agent-port", "a", 443, "agent server port") |
| _ = BackupCmd.MarkFlagRequired("agent-port") |
| |
| } |
| |
| // Steps of backup: |
| // 1. lock cluster |
| // 2. Get cluster info and save local backup info |
| // 3. Operate backup by agent-server |
| // 4. unlock cluster |
| // 5. Waiting for backups finished |
| // 6. Update local backup info |
| // 7. Double check backups all finished |
| // nolint:gocognit |
| func backup() error { |
| var ( |
| err error |
| lsBackup *model.LsBackup |
| cancel bool |
| ) |
| proxy, err := pkg.NewShardingSphereProxy(Username, Password, pkg.DefaultDBName, Host, Port) |
| if err != nil { |
| return xerr.NewCliErr(fmt.Sprintf("Connect shardingsphere proxy failed, err: %s", err)) |
| } |
| |
| ls, err := pkg.NewLocalStorage(pkg.DefaultRootDir()) |
| if err != nil { |
| return xerr.NewCliErr(fmt.Sprintf("Create local storage failed. err: %s", err)) |
| } |
| |
| defer func() { |
| if err != nil { |
| if !cancel { |
| logging.Warn("Try to unlock cluster ...") |
| } |
| if err := proxy.Unlock(); err != nil { |
| logging.Error(fmt.Sprintf("Since backup failed, try to unlock cluster, but still failed. err: %s", err)) |
| } |
| |
| if lsBackup != nil { |
| deleteBackupFiles(ls, lsBackup, deleteModeQuiet) |
| } |
| } |
| }() |
| |
| // Step1. lock cluster |
| logging.Info("Starting lock cluster ...") |
| err = proxy.LockForBackup() |
| if err != nil { |
| return xerr.NewCliErr(fmt.Sprintf("Lock for backup failed. err: %s", err)) |
| } |
| |
| // Step2. Get cluster info and save local backup info |
| logging.Info("Starting export metadata ...") |
| lsBackup, err = exportData(proxy, ls) |
| if err != nil { |
| return xerr.NewCliErr(fmt.Sprintf("export backup data failed. err: %s", err)) |
| } |
| logging.Info(fmt.Sprintf("Export backup data success, backup filename: %s", filename)) |
| |
| // Step3. Check agent server status |
| logging.Info("Checking agent server status...") |
| if available := checkAgentServerStatus(lsBackup); !available { |
| logging.Error("Cancel! One or more agent server are not available.") |
| err = xerr.NewCliErr("One or more agent server are not available.") |
| return err |
| } |
| |
| // Step4. Show disk space |
| logging.Info("Checking disk space...") |
| err = checkDiskSpace(lsBackup) |
| if err != nil { |
| return xerr.NewCliErr(fmt.Sprintf("check disk space failed. err: %s", err)) |
| } |
| |
| prompt := backupPromptFmt |
| err = promptutil.GetUserApproveInTerminal(prompt) |
| if err != nil { |
| cancel = true |
| return xerr.NewCliErr(fmt.Sprintf("%s", err)) |
| } |
| |
| // Step5. send backup command to agent-server. |
| logging.Info("Starting backup ...") |
| err = execBackup(lsBackup) |
| if err != nil { |
| return xerr.NewCliErr(fmt.Sprintf("exec backup failed. err: %s", err)) |
| } |
| |
| // Step6. unlock cluster |
| logging.Info("Starting unlock cluster ...") |
| err = proxy.Unlock() |
| if err != nil { |
| return xerr.NewCliErr(fmt.Sprintf("unlock cluster failed. err: %s", err)) |
| } |
| |
| // Step7. update backup file |
| logging.Info("Starting update backup file ...") |
| err = ls.WriteByJSON(filename, lsBackup) |
| if err != nil { |
| return xerr.NewCliErr(fmt.Sprintf("update backup file failed. err: %s", err)) |
| } |
| |
| // Step8. check agent server backup |
| logging.Info("Starting check backup status ...") |
| status := checkBackupStatus(lsBackup) |
| logging.Info(fmt.Sprintf("Backup result: %s", status)) |
| if status != model.SsBackupStatusCompleted && status != model.SsBackupStatusCanceled { |
| err = xerr.NewCliErr("Backup failed") |
| return err |
| } |
| |
| // Step9. finished backup and update backup file |
| logging.Info("Starting update backup file ...") |
| err = ls.WriteByJSON(filename, lsBackup) |
| if err != nil { |
| return xerr.NewCliErr(fmt.Sprintf("update backup file failed. err: %s", err)) |
| } |
| |
| logging.Info("Backup finished!") |
| return nil |
| } |
| |
| func exportData(proxy pkg.IShardingSphereProxy, ls pkg.ILocalStorage) (lsBackup *model.LsBackup, err error) { |
| // Step1. export cluster metadata from ss-proxy |
| cluster, err := proxy.ExportMetaData() |
| if err != nil { |
| return nil, xerr.NewCliErr(fmt.Sprintf("export meta data failed. err: %s", err)) |
| } |
| |
| // Step2. export storage nodes from ss-proxy |
| nodes, err := proxy.ExportStorageNodes() |
| if err != nil { |
| return nil, xerr.NewCliErr(fmt.Sprintf("export storage nodes failed. err: %s", err)) |
| } |
| |
| // Step3. combine the backup contents |
| filename = ls.GenFilename(pkg.ExtnJSON) |
| csn := "" |
| if cluster.SnapshotInfo != nil { |
| csn = cluster.SnapshotInfo.Csn |
| } |
| |
| contents := &model.LsBackup{ |
| Info: &model.BackupMetaInfo{ |
| ID: uuid.New().String(), // generate uuid for this backup |
| CSN: csn, |
| StartTime: timeutil.Now().String(), |
| EndTime: timeutil.Init(), |
| BackupMode: BackupMode, |
| }, |
| SsBackup: &model.SsBackup{ |
| Status: model.SsBackupStatusWaiting, // default status of backup is model.SsBackupStatusWaiting |
| ClusterInfo: cluster, |
| StorageNodes: nodes, |
| }, |
| } |
| |
| // Step4. finally, save data with json to local |
| if err := ls.WriteByJSON(filename, contents); err != nil { |
| return nil, xerr.NewCliErr(fmt.Sprintf("write backup info by json failed. err: %s", err)) |
| } |
| |
| return contents, nil |
| } |
| |
| func execBackup(lsBackup *model.LsBackup) error { |
| sNodes := lsBackup.SsBackup.StorageNodes |
| dnCh := make(chan *model.DataNode, len(sNodes)) |
| g := new(errgroup.Group) |
| |
| logging.Info("Starting send backup command to agent server...") |
| |
| for _, node := range sNodes { |
| sn := node |
| as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort)) |
| g.Go(func() error { |
| return _execBackup(as, sn, dnCh) |
| }) |
| } |
| |
| err := g.Wait() |
| close(dnCh) |
| |
| // if backup failed, return error |
| if err != nil { |
| lsBackup.SsBackup.Status = model.SsBackupStatusFailed |
| return xerr.NewCliErr(err.Error()) |
| } |
| |
| // save data node list to lsBackup |
| for dn := range dnCh { |
| lsBackup.DnList = append(lsBackup.DnList, dn) |
| } |
| |
| lsBackup.SsBackup.Status = model.SsBackupStatusRunning |
| return nil |
| } |
| |
| func _execBackup(as pkg.IAgentServer, node *model.StorageNode, dnCh chan *model.DataNode) error { |
| in := &model.BackupIn{ |
| DBPort: node.Port, |
| DBName: node.Database, |
| Username: node.Username, |
| Password: node.Password, |
| DnBackupPath: BackupPath, |
| DnThreadsNum: ThreadsNum, |
| DnBackupMode: BackupMode, |
| Instance: defaultInstance, |
| } |
| backupID, err := as.Backup(in) |
| status := model.SsBackupStatusRunning |
| if err != nil { |
| status = model.BackupStatus(err.Error()) |
| } |
| |
| // update DnList of lsBackup |
| dn := &model.DataNode{ |
| IP: node.IP, |
| Port: node.Port, |
| Status: status, |
| BackupID: backupID, |
| StartTime: timeutil.Now().String(), |
| EndTime: timeutil.Init(), |
| } |
| dnCh <- dn |
| if err != nil { |
| return fmt.Errorf("data node %s:%d backup error: %s", node.IP, node.Port, err) |
| } |
| return nil |
| } |
| |
| func checkBackupStatus(lsBackup *model.LsBackup) model.BackupStatus { |
| var ( |
| dataNodeMap = make(map[string]*model.DataNode) |
| dnCh = make(chan *model.DataNode, len(lsBackup.DnList)) |
| backupFinalStatus = model.SsBackupStatusCompleted |
| totalNum = len(lsBackup.SsBackup.StorageNodes) |
| dnResult = make([]*model.DataNode, 0) |
| ) |
| |
| if totalNum == 0 { |
| logging.Info("No data node need to backup") |
| return model.SsBackupStatusCanceled |
| } |
| |
| for _, dn := range lsBackup.DnList { |
| dataNodeMap[dn.IP] = dn |
| } |
| |
| pw := prettyoutput.NewProgressPrinter(prettyoutput.ProgressPrintOption{ |
| NumTrackersExpected: totalNum, |
| }) |
| |
| go pw.Render() |
| |
| for i := 0; i < totalNum; i++ { |
| sn := lsBackup.SsBackup.StorageNodes[i] |
| as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort)) |
| dn := dataNodeMap[sn.IP] |
| backupInfo := &model.BackupInfo{} |
| task := &backuptask{ |
| As: as, |
| Sn: sn, |
| Dn: dn, |
| DnCh: dnCh, |
| Backup: backupInfo, |
| retries: defaultShowDetailRetryTimes, |
| } |
| tracker := &progress.Tracker{ |
| Message: fmt.Sprintf("Checking backup status # %s:%d", sn.IP, sn.Port), |
| Total: 0, |
| Units: progress.UnitsDefault, |
| } |
| pw.AppendTracker(tracker) |
| go pw.UpdateProgress(tracker, task.checkProgress) |
| } |
| |
| pw.BlockedRendered() |
| |
| close(dnCh) |
| |
| for dn := range dnCh { |
| dnResult = append(dnResult, dn) |
| if dn.Status != model.SsBackupStatusCompleted { |
| backupFinalStatus = model.SsBackupStatusFailed |
| } |
| } |
| |
| // print backup result formatted |
| t := table.NewWriter() |
| t.SetOutputMirror(os.Stdout) |
| t.SetTitle("Backup Task Result: %s", backupFinalStatus) |
| t.AppendHeader(table.Row{"#", "Data Node IP", "Data Node Port", "Result"}) |
| |
| for i, dn := range dnResult { |
| t.AppendRow([]interface{}{i + 1, dn.IP, dn.Port, dn.Status}) |
| t.AppendSeparator() |
| } |
| |
| t.Render() |
| |
| lsBackup.DnList = dnResult |
| lsBackup.SsBackup.Status = backupFinalStatus |
| lsBackup.Info.EndTime = timeutil.Now().String() |
| return backupFinalStatus |
| } |
| |
| type backuptask struct { |
| As pkg.IAgentServer |
| Sn *model.StorageNode |
| Dn *model.DataNode |
| DnCh chan *model.DataNode |
| |
| Backup *model.BackupInfo |
| retries int |
| } |
| |
| func (t *backuptask) checkProgress() (bool, error) { |
| var err error |
| in := &model.ShowDetailIn{ |
| DBPort: t.Sn.Port, |
| DBName: t.Sn.Database, |
| Username: t.Sn.Username, |
| Password: t.Sn.Password, |
| DnBackupID: t.Dn.BackupID, |
| DnBackupPath: BackupPath, |
| Instance: defaultInstance, |
| } |
| |
| t.Backup, err = t.As.ShowDetail(in) |
| if err != nil { |
| if t.retries == 0 { |
| t.Dn.Status = model.SsBackupStatusCheckError |
| t.DnCh <- t.Dn |
| return false, err |
| } |
| time.Sleep(time.Second * 1) |
| t.retries-- |
| return t.checkProgress() |
| } |
| |
| t.Dn.Status = t.Backup.Status |
| t.Dn.EndTime = timeutil.Now().String() |
| |
| if t.Backup.Status == model.SsBackupStatusCompleted || t.Backup.Status == model.SsBackupStatusFailed { |
| t.DnCh <- t.Dn |
| return true, nil |
| } |
| return false, nil |
| } |
| |
| type deleteMode int |
| |
| const ( |
| deleteModeNormal deleteMode = iota |
| deleteModeQuiet |
| ) |
| |
| func deleteBackupFiles(ls pkg.ILocalStorage, lsBackup *model.LsBackup, m deleteMode) { |
| var ( |
| dataNodeMap = make(map[string]*model.DataNode) |
| totalNum = len(lsBackup.SsBackup.StorageNodes) |
| resultCh = make(chan *model.DeleteBackupResult, totalNum) |
| ) |
| for _, dn := range lsBackup.DnList { |
| dataNodeMap[dn.IP] = dn |
| } |
| |
| if totalNum == 0 { |
| logging.Info("No data node need to delete backup files") |
| return |
| } |
| |
| pw := prettyoutput.NewPW(totalNum) |
| go pw.Render() |
| |
| for _, sn := range lsBackup.SsBackup.StorageNodes { |
| sn := sn |
| dn, ok := dataNodeMap[sn.IP] |
| if !ok { |
| if m != deleteModeQuiet { |
| logging.Warn(fmt.Sprintf("SKIPPED! data node %s:%d not found in backup info.", sn.IP, sn.Port)) |
| } |
| continue |
| } |
| as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort)) |
| |
| go doDelete(as, sn, dn, resultCh, pw) |
| } |
| |
| if m != deleteModeQuiet { |
| |
| time.Sleep(time.Millisecond * 100) |
| for pw.IsRenderInProgress() { |
| if pw.LengthActive() == 0 { |
| pw.Stop() |
| } |
| time.Sleep(time.Millisecond * 100) |
| } |
| |
| close(resultCh) |
| |
| t := table.NewWriter() |
| t.SetOutputMirror(os.Stdout) |
| t.SetTitle("Delete Backup Files Result") |
| t.AppendHeader(table.Row{"#", "Node IP", "Node Port", "Result", "Message"}) |
| t.SetColumnConfigs([]table.ColumnConfig{{Number: 5, WidthMax: 50}}) |
| |
| idx := 0 |
| for result := range resultCh { |
| idx++ |
| t.AppendRow([]interface{}{idx, result.IP, result.Port, result.Status, result.Msg}) |
| t.AppendSeparator() |
| } |
| |
| t.Render() |
| } |
| |
| if err := ls.DeleteByName(filename); err != nil { |
| logging.Warn("Delete backup info file failed") |
| } |
| |
| if m != deleteModeQuiet { |
| logging.Info("Delete backup files finished") |
| } |
| } |
| |
| func doDelete(as pkg.IAgentServer, sn *model.StorageNode, dn *model.DataNode, resultCh chan *model.DeleteBackupResult, pw progress.Writer) { |
| var ( |
| tracker = progress.Tracker{Message: fmt.Sprintf("Deleting backup files # %s:%d", sn.IP, sn.Port), Total: 0, Units: progress.UnitsDefault} |
| ) |
| |
| pw.AppendTracker(&tracker) |
| |
| in := &model.DeleteBackupIn{ |
| DBPort: sn.Port, |
| DBName: sn.Database, |
| Username: sn.Username, |
| Password: sn.Password, |
| DnBackupPath: BackupPath, |
| BackupID: dn.BackupID, |
| Instance: defaultInstance, |
| } |
| |
| r := &model.DeleteBackupResult{ |
| IP: sn.IP, |
| Port: sn.Port, |
| } |
| |
| if err := as.DeleteBackup(in); err != nil { |
| r.Status = model.SsBackupStatusFailed |
| r.Msg = err.Error() |
| resultCh <- r |
| tracker.MarkAsErrored() |
| } else { |
| tracker.MarkAsDone() |
| r.Status = model.SsBackupStatusCompleted |
| resultCh <- r |
| } |
| } |