/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package cmd

import (
	"fmt"
	"os"

	"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg"
	"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
	"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/xerr"
	"github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/logging"
	"github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/prettyoutput"
	"github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/promptutil"
	"github.com/jedib0t/go-pretty/v6/progress"
	"github.com/jedib0t/go-pretty/v6/table"

	"github.com/spf13/cobra"
	"github.com/spf13/pflag"
)

//nolint:dupl
var DeleteCmd = &cobra.Command{
	Use:   "delete",
	Short: "Delete a backup record",
	Run: func(cmd *cobra.Command, args []string) {
		cmd.Flags().VisitAll(func(flag *pflag.Flag) {
			fmt.Printf("Flag: %s Value: %s\n", flag.Name, flag.Value)
		})

		if CSN == "" && RecordID == "" {
			logging.Error("Please specify csn or record id")
			return
		}

		if CSN != "" && RecordID != "" {
			logging.Error("Please specify only one of csn and record id")
			return
		}

		if err := deleteRecord(); err != nil {
			logging.Error(err.Error())
		}
	},
}

//nolint:dupl
func init() {
	RootCmd.AddCommand(DeleteCmd)

	DeleteCmd.Flags().StringVarP(&Host, "host", "H", "", "ss-proxy hostname or ip")
	_ = DeleteCmd.MarkFlagRequired("host")
	DeleteCmd.Flags().Uint16VarP(&Port, "port", "P", 0, "ss-proxy port")
	_ = DeleteCmd.MarkFlagRequired("port")
	DeleteCmd.Flags().StringVarP(&Username, "username", "u", "", "ss-proxy username")
	_ = DeleteCmd.MarkFlagRequired("username")
	DeleteCmd.Flags().StringVarP(&Password, "password", "p", "", "ss-proxy password")
	_ = DeleteCmd.MarkFlagRequired("password")
	DeleteCmd.Flags().StringVarP(&BackupPath, "dn-backup-path", "B", "", "openGauss data backup path")
	_ = DeleteCmd.MarkFlagRequired("dn-backup-path")
	DeleteCmd.Flags().Uint16VarP(&AgentPort, "agent-port", "a", 443, "agent server port")
	_ = DeleteCmd.MarkFlagRequired("agent-port")

	DeleteCmd.Flags().StringVarP(&CSN, "csn", "", "", "commit sequence number")
	DeleteCmd.Flags().StringVarP(&RecordID, "id", "", "", "backup record id")
}

const (
	deletePromptFmt = "The backup record(ID: %s, CSN: %s) will be deleted forever.\n" +
		"Are you sure to continue? (Y/N)"
)

//nolint:dupl
func deleteRecord() error {
	// init local storage
	ls, err := pkg.NewLocalStorage(pkg.DefaultRootDir())
	if err != nil {
		return xerr.NewCliErr(fmt.Sprintf("new local storage failed. err: %s", err.Error()))
	}

	// get backup record
	var baks []*model.LsBackup
	baks, err = validate(ls, CSN, RecordID)
	if err != nil {
		return err
	}

	bak := baks[0]
	// check agent server status
	logging.Info("Checking agent server status...")
	if available := checkAgentServerStatus(bak); !available {
		return xerr.NewCliErr("one or more agent server are not available.")
	}

	prompt := fmt.Sprintf(deletePromptFmt, bak.Info.ID, bak.Info.CSN)
	err = promptutil.GetUserApproveInTerminal(prompt)
	if err != nil {
		return xerr.NewCliErr(fmt.Sprintf("%s", err))
	}

	// mark the target backup record to be deleted
	// meanwhile this record cannot be restored
	if err := ls.HideByName(bak.Info.FileName); err != nil {
		return xerr.NewCliErr("cannot mark backup record.")
	}

	// exec delete
	logging.Info("Start delete backup data to openGauss...")
	if err := _execDelete(bak); err != nil {
		return xerr.NewCliErr(fmt.Sprintf("exec delete failed. err: %s", err))
	}
	logging.Info("Delete backup data success!")

	// delete the backup record
	if err := ls.DeleteByHidedName(bak.Info.FileName); err != nil {
		return xerr.NewCliErr(fmt.Sprintf("exec delete backup record failed. err: %s", err))
	}

	logging.Info("Delete success!")
	return nil
}

func _execDelete(lsBackup *model.LsBackup) error {
	var (
		dataNodeMap       = make(map[string]*model.DataNode)
		totalNum          = len(lsBackup.SsBackup.StorageNodes)
		resultCh          = make(chan *model.DeleteBackupResult, totalNum)
		dnResult          = make([]*model.DeleteBackupResult, 0)
		deleteFinalStatus = "Completed"
	)
	for _, dn := range lsBackup.DnList {
		dataNodeMap[dn.IP] = dn
	}

	if totalNum == 0 {
		logging.Info("No data node need to delete backup files")
		return nil
	}

	pw := prettyoutput.NewProgressPrinter(prettyoutput.ProgressPrintOption{
		NumTrackersExpected: totalNum,
	})

	go pw.Render()

	for _, storagenode := range lsBackup.SsBackup.StorageNodes {
		sn := storagenode
		if dn, ok := dataNodeMap[sn.IP]; !ok {
			logging.Warn(fmt.Sprintf("SKIPPED! data node %s:%d not found in backup info.", sn.IP, sn.Port))
			continue
		} else {
			as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort))
			backupInfo := &model.BackupInfo{
				ID: dn.BackupID,
			}
			task := &deletetask{
				As:       as,
				Sn:       sn,
				Dn:       dn,
				ResultCh: resultCh,
				Backup:   backupInfo,
			}

			tracker := &progress.Tracker{
				Message: fmt.Sprintf("Deleting backup files  # %s:%d", sn.IP, sn.Port),
			}
			pw.AppendTracker(tracker)
			go pw.UpdateProgress(tracker, task.checkProgress)
		}
	}

	pw.BlockedRendered()

	close(resultCh)

	for result := range resultCh {
		dnResult = append(dnResult, result)
		if result.Status != "Completed" {
			deleteFinalStatus = "Failed"
		}
	}

	t := table.NewWriter()
	t.SetOutputMirror(os.Stdout)
	t.SetTitle("Delete Backup Files Result")
	t.AppendHeader(table.Row{"#", "Node IP", "Node Port", "Result", "Message"})
	t.SetColumnConfigs([]table.ColumnConfig{{Number: 5, WidthMax: 50}})

	for i, result := range dnResult {
		t.AppendRow([]interface{}{i + 1, result.IP, result.Port, result.Status, result.Msg})
		t.AppendSeparator()
	}

	t.Render()

	if deleteFinalStatus == "Failed" {
		return xerr.NewCliErr("delete failed, please check the log for more details.")
	}

	return nil
}

type deletetask struct {
	As       pkg.IAgentServer
	Sn       *model.StorageNode
	Dn       *model.DataNode
	ResultCh chan *model.DeleteBackupResult
	Backup   *model.BackupInfo
}

func (t *deletetask) checkProgress() (bool, error) {
	var (
		err error
	)
	in := &model.DeleteBackupIn{
		DBPort:       t.Sn.Port,
		DBName:       t.Sn.Database,
		Username:     t.Sn.Username,
		Password:     t.Sn.Password,
		BackupID:     t.Backup.ID,
		DnBackupPath: BackupPath,
		Instance:     defaultInstance,
	}

	r := &model.DeleteBackupResult{
		IP:   t.Sn.IP,
		Port: t.Sn.Port,
	}

	if err = t.As.DeleteBackup(in); err != nil {
		r.Status = model.SsBackupStatusFailed
		r.Msg = err.Error()
		t.ResultCh <- r
		return false, err
	}

	r.Status = model.SsBackupStatusCompleted
	t.ResultCh <- r

	return true, nil
}
