blob: 84866d3c55ef8860da2d28b991ed71fa24c90d9a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package pkg
import (
"encoding/json"
"errors"
"fmt"
"strings"
"github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/cons"
"github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/pkg/model"
"github.com/apache/shardingsphere-on-cloud/pitr/agent/pkg/cmds"
"github.com/apache/shardingsphere-on-cloud/pitr/agent/pkg/gsutil"
"github.com/apache/shardingsphere-on-cloud/pitr/agent/pkg/logging"
"github.com/dlclark/regexp2"
)
type (
openGauss struct {
shell string
pgData string
pgDataTemp string
log logging.ILog
}
IOpenGauss interface {
AsyncBackup(backupPath, instanceName, backupMode string, threadsNum uint8, dbPort uint16) (string, error)
ShowBackup(backupPath, instanceName, backupID string) (*model.Backup, error)
Init(backupPath string) error
AddInstance(backupPath, instance string) error
DelInstance(backupPath, instance string) error
DelBackup(backupPath, instance, backupID string) error
Start() error
Stop() error
Status() (string, error)
Restore(backupPath, instance, backupID string, threadsNum uint8) error
ShowBackupList(backupPath, instanceName string) ([]*model.Backup, error)
Auth(user, password, dbName string, dbPort uint16) error
CheckSchema(user, password, dbName string, dbPort uint16, schema string) error
MvTempToPgData() error
MvPgDataToTemp() error
CleanPgDataTemp() error
}
)
var _ IOpenGauss = (*openGauss)(nil)
func NewOpenGauss(shell, pgData string, log logging.ILog) IOpenGauss {
dirs := strings.Split(pgData, "/")
dirs = append(dirs[0:len(dirs)-1], "temp")
return &openGauss{
shell: shell,
pgData: pgData,
pgDataTemp: strings.Join(dirs, "/"),
log: log,
}
}
const (
_backupFmt = "gs_probackup backup --backup-path=%s --instance=%s --backup-mode=%s --pgdata=%s --threads=%d --pgport %d 2>&1"
_showFmt = "gs_probackup show --instance=%s --backup-path=%s --backup-id=%s --format=json 2>&1"
_delBackupFmt = "gs_probackup delete --backup-path=%s --instance=%s --backup-id=%s 2>&1"
_restoreFmt = "gs_probackup restore --backup-path=%s --instance=%s --backup-id=%s --pgdata=%s --threads=%d 2>&1"
_initFmt = "gs_probackup init --backup-path=%s 2>&1"
_rmDirFmt = "rm -r %s"
_addInstanceFmt = "gs_probackup add-instance --backup-path=%s --instance=%s --pgdata=%s 2>&1"
_delInstanceFmt = "gs_probackup del-instance --backup-path=%s --instance=%s 2>&1"
_startOpenGaussFmt = "gs_ctl start --pgdata=%s"
_stopOpenGaussFmt = "gs_ctl stop --pgdata=%s"
_statusGaussFmt = "gs_ctl status --pgdata=%s"
_showListFmt = "gs_probackup show --instance=%s --backup-path=%s --format=json 2>&1"
_mvFmt = "mv %s %s"
_CmdErrorFmt = "cmds.Exec[shell=%s,cmd=%s] return err wrap: %s"
)
func (og *openGauss) AsyncBackup(backupPath, instanceName, backupMode string, threadsNum uint8, dbPort uint16) (string, error) {
var (
bid string
err error
)
cmd := fmt.Sprintf(_backupFmt, backupPath, instanceName, backupMode, og.pgData, threadsNum, dbPort)
outputs, err := cmds.AsyncExec(og.shell, cmd)
if err != nil {
return "", fmt.Errorf("cmds.AsyncExec[shell=%s,cmd=%s] return err wrap: %w", og.shell, cmd, cons.CmdAsyncBackupFailed)
}
for output := range outputs {
og.log.
Field("backup_path", backupPath).
Field("instance", instanceName).
Field("backup_mode", backupMode).
Field("pgdata", og.pgData).
Debug(fmt.Sprintf("AsyncBackup output[lineNo=%d,msg=%s,err=%v]", output.LineNo, output.Message, output.Error))
if output.Error != nil {
og.log.Error(fmt.Sprintf("output.Error[%s] is not nil", output.Error))
return "", output.Error
}
if strings.Contains(output.Message, "INFO: Backup start") {
bid, err = og.getBackupID(output.Message)
if err != nil {
og.log.Error(fmt.Sprintf("og.getBackupID[source=%s] return err wrap: %s", output.Message, err))
return "", err
}
}
}
return bid, nil //nolint
}
//nolint:dupl
func (og *openGauss) ShowBackup(backupPath, instanceName, backupID string) (*model.Backup, error) {
cmd := fmt.Sprintf(_showFmt, instanceName, backupPath, backupID)
list, err := og.showbackup(cmd, instanceName)
if err != nil {
return nil, err
}
if len(list) > 0 {
return list[0], nil
}
return nil, err
}
func (og *openGauss) DelBackup(backupPath, instanceName, backupID string) error {
cmd := fmt.Sprintf(_delBackupFmt, backupPath, instanceName, backupID)
_, err := cmds.Exec(og.shell, cmd)
if err != nil {
og.log.Error(fmt.Sprintf(_CmdErrorFmt, og.shell, cmd, err))
return err
}
return nil
}
func (og *openGauss) Init(backupPath string) error {
cmd := fmt.Sprintf(_initFmt, backupPath)
output, err := cmds.Exec(og.shell, cmd)
og.log.Debug(fmt.Sprintf("Init output[msg=%s,err=%v]", output, err))
if errors.Is(err, cons.CmdOperateFailed) {
og.log.Error(fmt.Sprintf("init backup path failure,err: %s, wrap: %s", err, cons.BackupPathAlreadyExist))
return err
}
if err != nil {
og.log.Error(fmt.Sprintf(_CmdErrorFmt, og.shell, cmd, err))
return err
}
return nil
}
func (og *openGauss) deinit(backupPath string) error {
if !strings.HasPrefix(backupPath, "/home/omm/") {
return cons.NoPermission
}
cmd := fmt.Sprintf(_rmDirFmt, backupPath)
if _, err := cmds.Exec(og.shell, cmd); err != nil {
og.log.Error(fmt.Sprintf(_CmdErrorFmt, og.shell, cmd, err))
return err
}
return nil
}
func (og *openGauss) AddInstance(backupPath, instance string) error {
cmd := fmt.Sprintf(_addInstanceFmt, backupPath, instance, og.pgData)
output, err := cmds.Exec(og.shell, cmd)
og.log.Debug(fmt.Sprintf("AddInstance[output=%s,err=%v]", output, err))
if errors.Is(err, cons.CmdOperateFailed) {
og.log.Error(fmt.Sprintf("add instance failure[output=%s], err: %s, wrap: %s", output, err, cons.InstanceAlreadyExist))
return fmt.Errorf("add instance failure[output=%s], err: %s, wrap: %w", output, err, cons.InstanceAlreadyExist)
}
if err != nil {
og.log.Error(fmt.Sprintf(_CmdErrorFmt, og.shell, cmd, cons.CmdAddInstanceFailed))
return err
}
return nil
}
func (og *openGauss) DelInstance(backupPath, instance string) error {
cmd := fmt.Sprintf(_delInstanceFmt, backupPath, instance)
output, err := cmds.Exec(og.shell, cmd)
og.log.Debug(fmt.Sprintf("DelInstance[output=%s,err=%v]", output, err))
if errors.Is(err, cons.CmdOperateFailed) {
og.log.Error(fmt.Sprintf("delete instance failure[output=%s], err: %s, wrap: %s", output, err, cons.InstanceNotExist))
return err
}
if err != nil {
og.log.Error(fmt.Sprintf(_CmdErrorFmt, og.shell, cmd, cons.CmdDelInstanceFailed))
return err
}
return nil
}
//nolint:dupl
func (og *openGauss) Start() error {
cmd := fmt.Sprintf(_startOpenGaussFmt, og.pgData)
output, err := cmds.Exec(og.shell, cmd)
og.log.Debug(fmt.Sprintf("Start openGauss[output=%s]", output))
if errors.Is(err, cons.CmdOperateFailed) {
og.log.Error(fmt.Sprintf("start openGauss failure[output=%s], err: %s, wrap: %s", output, err, cons.StartOpenGaussFailed))
return err
}
if err != nil {
og.log.Error(fmt.Sprintf("cmds.Exec[shell=%s,cmd=%s] output=%s return err wrap: %s", og.shell, cmd, output, cons.CmdStartOpenGaussFailed))
return err
}
return nil
}
//nolint:dupl
func (og *openGauss) Stop() error {
cmd := fmt.Sprintf(_stopOpenGaussFmt, og.pgData)
output, err := cmds.Exec(og.shell, cmd)
og.log.Debug(fmt.Sprintf("Stop openGauss[output=%s]", output))
if errors.Is(err, cons.CmdOperateFailed) {
og.log.Error(fmt.Sprintf("stop openGauss failure[output=%s], err: %s, wrap: %s", output, err, cons.StopOpenGaussFailed))
return err
}
if err != nil {
og.log.Error(fmt.Sprintf("cmds.Exec[shell=%s,cmd=%s] output=%s return err wrap: %s", og.shell, cmd, output, cons.CmdStopOpenGaussFailed))
return err
}
return nil
}
/*
Status return openGauss server status:
`Running` return "Runnging",nil
`Stopped` return "Stopped",nil
The others are abnormal states,return "" and error.
*/
func (og *openGauss) Status() (string, error) {
cmd := fmt.Sprintf(_statusGaussFmt, og.pgData)
output, err := cmds.Exec(og.shell, cmd)
og.log.Debug(fmt.Sprintf("Status openGauss[output=%s]", output))
if errors.Is(err, cons.CmdOperateFailed) {
if strings.Contains(err.Error(), "no server running") {
return "Stopped", nil
}
og.log.Error(fmt.Sprintf("get openGauss status failure[output=%s], err: %s, wrap: %s", output, err, cons.StopOpenGaussFailed))
return "", err
}
if err != nil {
og.log.Error(fmt.Sprintf("cmds.Exec[shell=%s,cmd=%s] output=%s return err wrap: %s", og.shell, cmd, output, cons.CmdStatusOpenGaussFailed))
return "", err
}
if strings.Contains(output, "server is running") {
return "Running", nil
}
return "", cons.UnknownOgStatus
}
// Restore TODO:Dependent environments require integration testing
func (og *openGauss) Restore(backupPath, instance, backupID string, threadsNum uint8) error {
cmd := fmt.Sprintf(_restoreFmt, backupPath, instance, backupID, og.pgData, threadsNum)
outputs, err := cmds.AsyncExec(og.shell, cmd)
for output := range outputs {
og.log.
//nolint:exhaustive
Fields(map[logging.FieldKey]string{
"backup_path": backupPath,
"instance": instance,
"backup_id": backupID,
}).
Debug(fmt.Sprintf("Restore openGauss[lineNo=%d,msg=%s]", output.LineNo, output.Message))
if err != nil {
og.log.Error(fmt.Sprintf("cmds.AsyncExec[output=%s] return err: %s, wrap: %s", output.Message, output.Error, cons.RestoreFailed))
return err
}
if output.Error != nil {
og.log.Error(fmt.Sprintf("cmds.AsyncExec outputs: Error[%s] is not nil, wrap: %s", output.Error, cons.RestoreFailed))
return output.Error
}
}
return nil
}
//nolint:dupl
func (og *openGauss) showbackup(cmd, instanceName string) ([]*model.Backup, error) {
output, err := cmds.Exec(og.shell, cmd)
if err != nil {
og.log.Error(fmt.Sprintf(_CmdErrorFmt, og.shell, cmd, cons.CmdShowBackupFailed))
return nil, err
}
var list []*model.BackupList
if err = json.Unmarshal([]byte(output), &list); err != nil {
og.log.Error(fmt.Sprintf("json.Unmarshal[output=%s] return err: %s, wrap: %s", output, err, cons.JSONUnmarshalFailed))
return nil, err
}
for _, ins := range list {
if ins.Instance == instanceName {
if len(ins.List) == 0 {
og.log.Error(fmt.Sprintf("instance[name=%s], backupList[v=%+v], err wrap: %s", ins.Instance, list, cons.DataNotFound))
return nil, err
}
return ins.List, nil
}
}
og.log.Error(fmt.Sprintf("backupList[v=%+v], err wrap: %s", list, cons.DataNotFound))
return nil, err
}
func (og *openGauss) ShowBackupList(backupPath, instanceName string) ([]*model.Backup, error) {
cmd := fmt.Sprintf(_showListFmt, instanceName, backupPath)
return og.showbackup(cmd, instanceName)
}
//nolint:unused
func (og *openGauss) ignore(outputs chan *cmds.Output) {
defer func() {
_ = recover()
}()
for range outputs {
//ignore all
}
//outputs closed
}
func (og *openGauss) getBackupID(msg string) (string, error) {
re := regexp2.MustCompile("(?<=backup ID:\\s+)\\w+(?=,)", 0)
match, err := re.FindStringMatch(msg)
if err != nil {
og.log.Error(fmt.Sprintf("unmatch any backup id[msg=%s], err: %s", msg, err))
return "", err
}
if match == nil || match.Length == 0 {
og.log.Error(fmt.Sprintf("unmatch any backup id,match.lenght is 0, err wrap: %s", cons.UnmatchBackupID))
return "", err
}
return match.String(), err
}
func (og *openGauss) Auth(user, password, dbName string, dbPort uint16) error {
if strings.Trim(user, " ") == "" ||
strings.Trim(password, " ") == "" ||
strings.Trim(dbName, " ") == "" ||
dbPort == 0 {
return fmt.Errorf("invalid inputs[user=%s,password=%s,dbName=%s,dbPort=%d], err wrap: %w", user, password, dbName, dbPort, cons.MissingDBInformation)
}
_og, err := gsutil.Open(user, password, dbName, dbPort)
if err != nil {
og.log.Error(fmt.Sprintf("gsutil.Open failure,err=%s", err))
return err
}
if err := _og.Ping(); err != nil {
og.log.Error(fmt.Sprintf("ping openGauss fail[user=%s,pw length=%d,dbName=%s], err wrap: %s", user, len(password), dbName, err))
return err
}
return nil
}
func (og *openGauss) MvPgDataToTemp() error {
cmd := fmt.Sprintf(_mvFmt, og.pgData, og.pgDataTemp)
_, err := cmds.Exec(og.shell, cmd)
if errors.Is(err, cons.CmdOperateFailed) {
og.log.Error(fmt.Sprintf("mv pgdata to temp dir failure, err: %s, wrap: %s", err, cons.MvPgDataToTempFailed))
return err
}
if err != nil {
og.log.Error(fmt.Sprintf(_CmdErrorFmt, og.shell, cmd, err))
return err
}
return nil
}
func (og *openGauss) MvTempToPgData() error {
cmd := fmt.Sprintf(_mvFmt, og.pgDataTemp, og.pgData)
_, err := cmds.Exec(og.shell, cmd)
if errors.Is(err, cons.CmdOperateFailed) {
og.log.Error(fmt.Sprintf("mv temp to pgdata dir failure, err: %s, wrap: %s", err, cons.MvTempToPgDataFailed))
return err
}
if err != nil {
og.log.Error(fmt.Sprintf(_CmdErrorFmt, og.shell, cmd, err))
return err
}
return nil
}
func (og *openGauss) CleanPgDataTemp() error {
cmd := fmt.Sprintf(_rmDirFmt, og.pgDataTemp)
_, err := cmds.Exec(og.shell, cmd)
if errors.Is(err, cons.CmdOperateFailed) {
og.log.Error(fmt.Sprintf("clean pgdata temp dir failure, err: %s, wrap: %s", err, cons.CleanPgDataTempFailed))
return err
}
if err != nil {
og.log.Error(fmt.Sprintf(_CmdErrorFmt, og.shell, cmd, err))
return err
}
return nil
}
func (og *openGauss) CheckSchema(user, password, dbName string, dbPort uint16, schema string) error {
_og, err := gsutil.Open(user, password, dbName, dbPort)
if err != nil {
og.log.Error(fmt.Sprintf("gsutil.Open failure, err wrap: %s", err))
return err
}
if err := _og.CheckSchema(schema); err != nil {
og.log.Error(fmt.Sprintf("check openGauss schema fail[user=%s,dbName=%s, schema=%s], err wrap: %s", user, dbName, schema, err))
return err
}
return nil
}