blob: 7cac7a7a0b3cbf5b6afb7488407ce4ca5b096a4b [file]
package cmd
import (
"errors"
"fmt"
"os"
"strings"
"github.com/apache/cloudberry-go-libs/cluster"
"github.com/apache/cloudberry-go-libs/dbconn"
"github.com/apache/cloudberry-go-libs/gplog"
"github.com/spf13/cobra"
"github.com/blang/semver"
)
// ClusterData is exported for testing
type ClusterData struct {
Cluster *cluster.Cluster
Output *cluster.RemoteOutput
NumHosts int
connection *dbconn.DBConn
}
func createCobraCommand(use string, short string, cmd *command) *cobra.Command {
if cmd == nil {
return &cobra.Command{Use: use, Short: short}
}
return &cobra.Command{
Use: use,
Short: short,
Run: func(cobraCmd *cobra.Command, args []string) {
clusterData, err := doSetup()
if err == nil {
err = clusterRun(cmd, clusterData)
}
exitWithReturnCode(err)
},
}
}
var (
clusterCmd = createCobraCommand("cluster", "Perform <command> on each segment host in the cluster", nil)
initCmd = createCobraCommand("init", "(deprecated) Install PXF extension under $GPHOME on coordinator, standby coordinator, and all segment hosts", &InitCommand)
startCmd = createCobraCommand("start", "Start the PXF server instances on coordinator, standby coordinator, and all segment hosts", &StartCommand)
stopCmd = createCobraCommand("stop", "Stop the PXF server instances on coordinator, standby coordinator, and all segment hosts", &StopCommand)
statusCmd = createCobraCommand("status", "Get status of PXF servers on coordinator, standby coordinator, and all segment hosts", &StatusCommand)
syncCmd = createCobraCommand("sync", "Sync PXF configs from coordinator to standby coordinator and all segment hosts. Use --delete to delete extraneous remote files", &SyncCommand)
resetCmd = createCobraCommand("reset", "(deprecated) No operation", &ResetCommand)
registerCmd = createCobraCommand("register", "Install PXF extension under $GPHOME on coordinator, standby coordinator, and all segment hosts", &RegisterCommand)
restartCmd = createCobraCommand("restart", "Restart the PXF server on coordinator, standby coordinator, and all segment hosts", &RestartCommand)
prepareCmd = createCobraCommand("prepare", "Prepares a new base directory specified by the $PXF_BASE environment variable", &PrepareCommand)
migrateCmd = createCobraCommand("migrate", "Migrates configurations from older installations of PXF", &MigrateCommand)
// DeleteOnSync is a boolean for determining whether to use rsync with --delete, exported for tests
DeleteOnSync bool
)
func init() {
rootCmd.AddCommand(clusterCmd)
clusterCmd.AddCommand(initCmd)
clusterCmd.AddCommand(startCmd)
clusterCmd.AddCommand(stopCmd)
clusterCmd.AddCommand(statusCmd)
syncCmd.Flags().BoolVarP(&DeleteOnSync, "delete", "d", false, "delete extraneous files on remote host")
clusterCmd.AddCommand(syncCmd)
clusterCmd.AddCommand(resetCmd)
clusterCmd.AddCommand(registerCmd)
clusterCmd.AddCommand(restartCmd)
clusterCmd.AddCommand(prepareCmd)
clusterCmd.AddCommand(migrateCmd)
}
func exitWithReturnCode(err error) {
if err != nil {
os.Exit(1)
}
os.Exit(0)
}
func handlePlurality(num int) string {
if num == 1 {
return ""
}
return "s"
}
// GenerateStatusReport exported for testing
func GenerateStatusReport(cmd *command, clusterData *ClusterData) {
if _, ok := cmd.messages[standby]; !ok {
// this command cares not about standby
gplog.Info("%s", fmt.Sprintf(cmd.messages[status], clusterData.NumHosts, handlePlurality(clusterData.NumHosts)))
return
}
standbyMsg := ""
numHosts := clusterData.NumHosts
if cmd.whereToRun&cluster.INCLUDE_MASTER == cluster.INCLUDE_MASTER {
numHosts--
}
if isStandbyAloneOnHost(clusterData) {
standbyMsg = cmd.messages[standby]
numHosts--
}
gplog.Info("%s", fmt.Sprintf(cmd.messages[status], standbyMsg, numHosts, handlePlurality(numHosts)))
}
// GenerateOutput is exported for testing
func GenerateOutput(cmd *command, clusterData *ClusterData) error {
numErrors := clusterData.Output.NumErrors
if numErrors == 0 {
gplog.Info(cmd.messages[success], clusterData.NumHosts-numErrors, clusterData.NumHosts, handlePlurality(clusterData.NumHosts))
return nil
}
response := ""
for _, failedCommand := range clusterData.Output.FailedCommands {
host := failedCommand.Host
errorMessage := failedCommand.Stderr
if len(errorMessage) == 0 {
errorMessage = failedCommand.Stdout
}
lines := strings.Split(errorMessage, "\n")
errorMessage = lines[0]
if len(lines) > 1 {
errorMessage += "\n" + lines[1]
}
if len(lines) > 2 {
errorMessage += "..."
}
response += fmt.Sprintf("%s ==> %s\n", host, errorMessage)
}
gplog.Info("%s", fmt.Sprintf("ERROR: "+cmd.messages[err], numErrors, clusterData.NumHosts, handlePlurality(clusterData.NumHosts)))
gplog.Error("%s", response)
return errors.New(response)
}
func doSetup() (*ClusterData, error) {
connection := dbconn.NewDBConnFromEnvironment("postgres")
err := connection.Connect(1)
if err != nil {
gplog.Error("%s", fmt.Sprintf("ERROR: Could not connect to Cloudberry.\n%s\n"+
"Please make sure that your Apache Cloudberry is running and you are on the coordinator node.", err.Error()))
return nil, err
}
//set the fake version for cbdb.
connection.Version = dbconn.GPDBVersion{VersionString: "7.1.0", SemVer: semver.MustParse("7.1.0")}
segConfigs, err := cluster.GetSegmentConfiguration(connection, true)
if err != nil {
gplog.Error("ERROR: Could not retrieve segment information from GPDB.\n%s", err.Error())
return nil, err
}
clusterData := &ClusterData{Cluster: cluster.NewCluster(segConfigs), connection: connection}
return clusterData, nil
}
func clusterRun(cmd *command, clusterData *ClusterData) error {
defer clusterData.connection.Close()
err := cmd.Warn(os.Stdin)
if err != nil {
gplog.Info("%s", fmt.Sprintf("%s", err))
return err
}
functionToExecute, err := cmd.GetFunctionToExecute()
if err != nil {
gplog.Error("%s", fmt.Sprintf("Error: %s", err))
return err
}
commandList := clusterData.Cluster.GenerateSSHCommandList(cmd.whereToRun, functionToExecute)
clusterData.NumHosts = len(commandList)
GenerateStatusReport(cmd, clusterData)
clusterData.Output = clusterData.Cluster.ExecuteClusterCommand(cmd.whereToRun, commandList)
return GenerateOutput(cmd, clusterData)
}
func isStandbyAloneOnHost(clusterData *ClusterData) bool {
standbyHost := clusterData.Cluster.GetHostForContent(-1, "m")
if standbyHost == "" {
return false // there is no standby coordinator
}
return len(clusterData.Cluster.GetContentsForHost(standbyHost)) == 1
}