| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package cmd |
| |
| import ( |
| "fmt" |
| "os" |
| |
| "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg" |
| "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model" |
| "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/xerr" |
| "github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/logging" |
| "github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/prettyoutput" |
| "github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/promptutil" |
| "github.com/jedib0t/go-pretty/v6/progress" |
| "github.com/jedib0t/go-pretty/v6/table" |
| |
| "github.com/spf13/cobra" |
| "github.com/spf13/pflag" |
| ) |
| |
| //nolint:dupl |
| var DeleteCmd = &cobra.Command{ |
| Use: "delete", |
| Short: "Delete a backup record", |
| Run: func(cmd *cobra.Command, args []string) { |
| cmd.Flags().VisitAll(func(flag *pflag.Flag) { |
| fmt.Printf("Flag: %s Value: %s\n", flag.Name, flag.Value) |
| }) |
| |
| if CSN == "" && RecordID == "" { |
| logging.Error("Please specify csn or record id") |
| return |
| } |
| |
| if CSN != "" && RecordID != "" { |
| logging.Error("Please specify only one of csn and record id") |
| return |
| } |
| |
| if err := deleteRecord(); err != nil { |
| logging.Error(err.Error()) |
| } |
| }, |
| } |
| |
| //nolint:dupl |
| func init() { |
| RootCmd.AddCommand(DeleteCmd) |
| |
| DeleteCmd.Flags().StringVarP(&Host, "host", "H", "", "ss-proxy hostname or ip") |
| _ = DeleteCmd.MarkFlagRequired("host") |
| DeleteCmd.Flags().Uint16VarP(&Port, "port", "P", 0, "ss-proxy port") |
| _ = DeleteCmd.MarkFlagRequired("port") |
| DeleteCmd.Flags().StringVarP(&Username, "username", "u", "", "ss-proxy username") |
| _ = DeleteCmd.MarkFlagRequired("username") |
| DeleteCmd.Flags().StringVarP(&Password, "password", "p", "", "ss-proxy password") |
| _ = DeleteCmd.MarkFlagRequired("password") |
| DeleteCmd.Flags().StringVarP(&BackupPath, "dn-backup-path", "B", "", "openGauss data backup path") |
| _ = DeleteCmd.MarkFlagRequired("dn-backup-path") |
| DeleteCmd.Flags().Uint16VarP(&AgentPort, "agent-port", "a", 443, "agent server port") |
| _ = DeleteCmd.MarkFlagRequired("agent-port") |
| |
| DeleteCmd.Flags().StringVarP(&CSN, "csn", "", "", "commit sequence number") |
| DeleteCmd.Flags().StringVarP(&RecordID, "id", "", "", "backup record id") |
| } |
| |
| const ( |
| deletePromptFmt = "The backup record(ID: %s, CSN: %s) will be deleted forever.\n" + |
| "Are you sure to continue? (Y/N)" |
| ) |
| |
| //nolint:dupl |
| func deleteRecord() error { |
| // init local storage |
| ls, err := pkg.NewLocalStorage(pkg.DefaultRootDir()) |
| if err != nil { |
| return xerr.NewCliErr(fmt.Sprintf("new local storage failed. err: %s", err.Error())) |
| } |
| |
| // get backup record |
| var baks []*model.LsBackup |
| baks, err = validate(ls, CSN, RecordID) |
| if err != nil { |
| return err |
| } |
| |
| bak := baks[0] |
| // check agent server status |
| logging.Info("Checking agent server status...") |
| if available := checkAgentServerStatus(bak); !available { |
| return xerr.NewCliErr("one or more agent server are not available.") |
| } |
| |
| prompt := fmt.Sprintf(deletePromptFmt, bak.Info.ID, bak.Info.CSN) |
| err = promptutil.GetUserApproveInTerminal(prompt) |
| if err != nil { |
| return xerr.NewCliErr(fmt.Sprintf("%s", err)) |
| } |
| |
| // mark the target backup record to be deleted |
| // meanwhile this record cannot be restored |
| if err := ls.HideByName(bak.Info.FileName); err != nil { |
| return xerr.NewCliErr("cannot mark backup record.") |
| } |
| |
| // exec delete |
| logging.Info("Start delete backup data to openGauss...") |
| if err := _execDelete(bak); err != nil { |
| return xerr.NewCliErr(fmt.Sprintf("exec delete failed. err: %s", err)) |
| } |
| logging.Info("Delete backup data success!") |
| |
| // delete the backup record |
| if err := ls.DeleteByHidedName(bak.Info.FileName); err != nil { |
| return xerr.NewCliErr(fmt.Sprintf("exec delete backup record failed. err: %s", err)) |
| } |
| |
| logging.Info("Delete success!") |
| return nil |
| } |
| |
| func _execDelete(lsBackup *model.LsBackup) error { |
| var ( |
| dataNodeMap = make(map[string]*model.DataNode) |
| totalNum = len(lsBackup.SsBackup.StorageNodes) |
| resultCh = make(chan *model.DeleteBackupResult, totalNum) |
| dnResult = make([]*model.DeleteBackupResult, 0) |
| deleteFinalStatus = "Completed" |
| ) |
| for _, dn := range lsBackup.DnList { |
| dataNodeMap[dn.IP] = dn |
| } |
| |
| if totalNum == 0 { |
| logging.Info("No data node need to delete backup files") |
| return nil |
| } |
| |
| pw := prettyoutput.NewProgressPrinter(prettyoutput.ProgressPrintOption{ |
| NumTrackersExpected: totalNum, |
| }) |
| |
| go pw.Render() |
| |
| for _, storagenode := range lsBackup.SsBackup.StorageNodes { |
| sn := storagenode |
| if dn, ok := dataNodeMap[sn.IP]; !ok { |
| logging.Warn(fmt.Sprintf("SKIPPED! data node %s:%d not found in backup info.", sn.IP, sn.Port)) |
| continue |
| } else { |
| as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort)) |
| backupInfo := &model.BackupInfo{ |
| ID: dn.BackupID, |
| } |
| task := &deletetask{ |
| As: as, |
| Sn: sn, |
| Dn: dn, |
| ResultCh: resultCh, |
| Backup: backupInfo, |
| } |
| |
| tracker := &progress.Tracker{ |
| Message: fmt.Sprintf("Deleting backup files # %s:%d", sn.IP, sn.Port), |
| } |
| pw.AppendTracker(tracker) |
| go pw.UpdateProgress(tracker, task.checkProgress) |
| } |
| } |
| |
| pw.BlockedRendered() |
| |
| close(resultCh) |
| |
| for result := range resultCh { |
| dnResult = append(dnResult, result) |
| if result.Status != "Completed" { |
| deleteFinalStatus = "Failed" |
| } |
| } |
| |
| t := table.NewWriter() |
| t.SetOutputMirror(os.Stdout) |
| t.SetTitle("Delete Backup Files Result") |
| t.AppendHeader(table.Row{"#", "Node IP", "Node Port", "Result", "Message"}) |
| t.SetColumnConfigs([]table.ColumnConfig{{Number: 5, WidthMax: 50}}) |
| |
| for i, result := range dnResult { |
| t.AppendRow([]interface{}{i + 1, result.IP, result.Port, result.Status, result.Msg}) |
| t.AppendSeparator() |
| } |
| |
| t.Render() |
| |
| if deleteFinalStatus == "Failed" { |
| return xerr.NewCliErr("delete failed, please check the log for more details.") |
| } |
| |
| return nil |
| } |
| |
| type deletetask struct { |
| As pkg.IAgentServer |
| Sn *model.StorageNode |
| Dn *model.DataNode |
| ResultCh chan *model.DeleteBackupResult |
| Backup *model.BackupInfo |
| } |
| |
| func (t *deletetask) checkProgress() (bool, error) { |
| var ( |
| err error |
| ) |
| in := &model.DeleteBackupIn{ |
| DBPort: t.Sn.Port, |
| DBName: t.Sn.Database, |
| Username: t.Sn.Username, |
| Password: t.Sn.Password, |
| BackupID: t.Backup.ID, |
| DnBackupPath: BackupPath, |
| Instance: defaultInstance, |
| } |
| |
| r := &model.DeleteBackupResult{ |
| IP: t.Sn.IP, |
| Port: t.Sn.Port, |
| } |
| |
| if err = t.As.DeleteBackup(in); err != nil { |
| r.Status = model.SsBackupStatusFailed |
| r.Msg = err.Error() |
| t.ResultCh <- r |
| return false, err |
| } |
| |
| r.Status = model.SsBackupStatusCompleted |
| t.ResultCh <- r |
| |
| return true, nil |
| } |