| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package client |
| |
| import ( |
| "context" |
| "fmt" |
| "reflect" |
| "time" |
| |
| "github.com/apache/incubator-pegasus/admin-cli/util" |
| "github.com/apache/incubator-pegasus/go-client/idl/admin" |
| "github.com/apache/incubator-pegasus/go-client/idl/base" |
| "github.com/apache/incubator-pegasus/go-client/idl/replication" |
| "github.com/apache/incubator-pegasus/go-client/session" |
| ) |
| |
| type BalanceType int |
| |
| const ( |
| BalanceMovePri BalanceType = iota |
| BalanceCopyPri |
| BalanceCopySec |
| ) |
| |
| func (t BalanceType) String() string { |
| switch t { |
| case BalanceMovePri: |
| return "MovePri" |
| case BalanceCopyPri: |
| return "CopyPri" |
| case BalanceCopySec: |
| return "CopySec" |
| default: |
| panic(fmt.Sprintf("unexpected BalanceType: %d", t)) |
| } |
| } |
| |
| // Meta is a helper over pegasus-go-client's primitive session.MetaManager. |
| // It aims to provide an easy-to-use API that eliminates some boilerplate code, like |
| // context creation, request/response creation, etc. |
| type Meta interface { |
| Close() error |
| |
| // ListAvailableApps lists only available tables. |
| ListAvailableApps() ([]*admin.AppInfo, error) |
| |
| ListApps(status admin.AppStatus) ([]*admin.AppInfo, error) |
| |
| QueryConfig(tableName string) (*replication.QueryCfgResponse, error) |
| |
| MetaControl(level admin.MetaFunctionLevel) (oldLevel admin.MetaFunctionLevel, err error) |
| |
| QueryClusterInfo() (map[string]string, error) |
| |
| UpdateAppEnvs(tableName string, envs map[string]string) error |
| |
| ClearAppEnvs(tableName string, clearPrefix string) error |
| |
| DelAppEnvs(tableName string, keys []string) error |
| |
| CreateApp(tableName string, envs map[string]string, partitionCount int) (int32, error) |
| |
| DropApp(tableName string, reserveSeconds int64) error |
| |
| ModifyDuplication(tableName string, dupid int, status admin.DuplicationStatus) error |
| |
| AddDuplication(tableName string, remoteCluster string, duplicateCheckpoint bool) (*admin.DuplicationAddResponse, error) |
| |
| QueryDuplication(tableName string) (*admin.DuplicationQueryResponse, error) |
| |
| ListNodes() ([]*admin.NodeInfo, error) |
| |
| RecallApp(originTableID int, newTableName string) (*admin.AppInfo, error) |
| |
| Balance(gpid *base.Gpid, opType BalanceType, from *util.PegasusNode, to *util.PegasusNode) error |
| |
| Propose(gpid *base.Gpid, action admin.ConfigType, target *util.PegasusNode, node *util.PegasusNode) error |
| |
| StartBackupApp(tableID int, providerType string, backupPath string) (*admin.StartBackupAppResponse, error) |
| |
| QueryBackupStatus(tableID int, backupID int64) (*admin.QueryBackupStatusResponse, error) |
| |
| RestoreApp(oldClusterName string, oldTableName string, oldTableID int, backupID int64, providerType string, |
| newTableName string, restorePath string, skipBadPartition bool, policyName string) (*admin.CreateAppResponse, error) |
| |
| StartPartitionSplit(tableName string, newPartitionCount int) error |
| |
| QuerySplitStatus(tableName string) (*admin.QuerySplitResponse, error) |
| |
| PausePartitionSplit(tableName string, parentPidx int) error |
| |
| RestartPartitionSplit(tableName string, parentPidx int) error |
| |
| CancelPartitionSplit(tableName string, oldPartitionCount int) error |
| |
| StartBulkLoad(tableName string, clusterName string, providerType string, rootPath string) error |
| |
| QueryBulkLoad(tableName string) (*admin.QueryBulkLoadResponse, error) |
| |
| PauseBulkLoad(tableName string) error |
| |
| RestartBulkLoad(tableName string) error |
| |
| CancelBulkLoad(tableName string, forced bool) error |
| |
| ClearBulkLoad(tableName string) error |
| |
| StartManualCompaction(tableName string, targetLevel int, maxRunningCount int, bottommost bool) error |
| |
| QueryManualCompaction(tableName string) (*admin.QueryAppManualCompactResponse, error) |
| } |
| |
| type rpcBasedMeta struct { |
| meta *session.MetaManager |
| } |
| |
| // NewRPCBasedMeta creates the connection to meta. |
| func NewRPCBasedMeta(metaAddrs []string) Meta { |
| return &rpcBasedMeta{ |
| meta: session.NewMetaManager(metaAddrs, session.NewNodeSession), |
| } |
| } |
| |
| // Some responses have not only error-code but also a string-type "hint" that can tells the error details. |
| // This function wraps the "hint" into error. |
| func wrapHintIntoError(hint string, err error) error { |
| if err != nil { |
| if hint != "" { |
| return fmt.Errorf("%s [hint: %s]", err, hint) |
| } |
| } |
| return err |
| } |
| |
| func (m *rpcBasedMeta) Close() error { |
| return m.meta.Close() |
| } |
| |
| // `callback` always accepts non-nil `resp`. |
| func (m *rpcBasedMeta) callMeta(methodName string, req interface{}, callback func(resp interface{})) error { |
| ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout) |
| defer cancel() |
| |
| ret := reflect.ValueOf(m.meta).MethodByName(methodName).Call([]reflect.Value{ |
| reflect.ValueOf(ctx), |
| reflect.ValueOf(req), |
| }) |
| |
| // the last returned value is always error |
| ierr := ret[len(ret)-1].Interface() |
| var err error |
| if ierr != nil { |
| err = ierr.(error) |
| } |
| |
| if len(ret) == 1 { |
| return err |
| } |
| |
| // len(ret) == 2 |
| if !ret[0].IsNil() { |
| callback(ret[0].Interface()) |
| } |
| return err |
| } |
| |
| func (m *rpcBasedMeta) ListAvailableApps() ([]*admin.AppInfo, error) { |
| return m.ListApps(admin.AppStatus_AS_AVAILABLE) |
| } |
| |
| func (m *rpcBasedMeta) ListApps(status admin.AppStatus) ([]*admin.AppInfo, error) { |
| var result []*admin.AppInfo |
| req := &admin.ListAppsRequest{Status: status} |
| err := m.callMeta("ListApps", req, func(resp interface{}) { |
| result = resp.(*admin.ListAppsResponse).Infos |
| }) |
| return result, err |
| } |
| |
| func (m *rpcBasedMeta) QueryConfig(tableName string) (*replication.QueryCfgResponse, error) { |
| var result *replication.QueryCfgResponse |
| err := m.callMeta("QueryConfig", tableName, func(resp interface{}) { |
| result = resp.(*replication.QueryCfgResponse) |
| }) |
| if err == nil { |
| if result.GetErr().Errno == base.ERR_OBJECT_NOT_FOUND.String() { |
| return nil, fmt.Errorf("table(%s) doesn't exist", tableName) |
| } |
| if result.GetErr().Errno != base.ERR_OK.String() { |
| return nil, fmt.Errorf("query config failed: %s", result.GetErr()) |
| } |
| } |
| return result, err |
| } |
| |
| func (m *rpcBasedMeta) MetaControl(level admin.MetaFunctionLevel) (oldLevel admin.MetaFunctionLevel, err error) { |
| req := &admin.MetaControlRequest{Level: level} |
| err = m.callMeta("MetaControl", req, func(resp interface{}) { |
| oldLevel = resp.(*admin.MetaControlResponse).OldLevel |
| }) |
| return oldLevel, err |
| } |
| |
| func (m *rpcBasedMeta) QueryClusterInfo() (map[string]string, error) { |
| result := make(map[string]string) |
| req := &admin.ClusterInfoRequest{} |
| err := m.callMeta("QueryClusterInfo", req, func(resp interface{}) { |
| keys := resp.(*admin.ClusterInfoResponse).Keys |
| values := resp.(*admin.ClusterInfoResponse).Values |
| for i := range keys { |
| result[keys[i]] = values[i] |
| } |
| }) |
| return result, err |
| } |
| |
| func (m *rpcBasedMeta) updateAppEnvs(req *admin.UpdateAppEnvRequest) error { |
| var hint string |
| err := m.callMeta("UpdateAppEnv", req, func(resp interface{}) { |
| hint = resp.(*admin.UpdateAppEnvResponse).HintMessage |
| }) |
| return wrapHintIntoError(hint, err) |
| } |
| |
| func (m *rpcBasedMeta) UpdateAppEnvs(tableName string, envs map[string]string) error { |
| var keys []string |
| var values []string |
| for key, value := range envs { |
| keys = append(keys, key) |
| values = append(values, value) |
| } |
| req := &admin.UpdateAppEnvRequest{ |
| AppName: tableName, |
| Op: admin.AppEnvOperation_APP_ENV_OP_SET, |
| Keys: keys, |
| Values: values, |
| } |
| return m.updateAppEnvs(req) |
| } |
| |
| func (m *rpcBasedMeta) ClearAppEnvs(tableName string, clearPrefix string) error { |
| req := &admin.UpdateAppEnvRequest{ |
| AppName: tableName, |
| Op: admin.AppEnvOperation_APP_ENV_OP_CLEAR, |
| ClearPrefix: &clearPrefix, |
| } |
| return m.updateAppEnvs(req) |
| } |
| |
| func (m *rpcBasedMeta) DelAppEnvs(tableName string, keys []string) error { |
| req := &admin.UpdateAppEnvRequest{ |
| AppName: tableName, |
| Op: admin.AppEnvOperation_APP_ENV_OP_DEL, |
| Keys: keys, |
| } |
| return m.updateAppEnvs(req) |
| } |
| |
| func (m *rpcBasedMeta) CreateApp(tableName string, envs map[string]string, partitionCount int) (int32, error) { |
| var appID int32 |
| req := &admin.CreateAppRequest{ |
| AppName: tableName, |
| Options: &admin.CreateAppOptions{ |
| PartitionCount: int32(partitionCount), |
| Envs: envs, |
| |
| // constants |
| ReplicaCount: 3, |
| IsStateful: true, |
| AppType: "pegasus", |
| }, |
| } |
| err := m.callMeta("CreateApp", req, func(resp interface{}) { |
| appID = resp.(*admin.CreateAppResponse).Appid |
| }) |
| return appID, err |
| } |
| |
| func (m *rpcBasedMeta) DropApp(tableName string, reserveSeconds int64) error { |
| req := &admin.DropAppRequest{ |
| AppName: tableName, |
| Options: &admin.DropAppOptions{ |
| SuccessIfNotExist: true, |
| ReserveSeconds: &reserveSeconds, |
| }, |
| } |
| err := m.callMeta("DropApp", req, func(resp interface{}) {}) |
| return err |
| } |
| |
| func (m *rpcBasedMeta) ModifyDuplication(tableName string, dupid int, status admin.DuplicationStatus) error { |
| req := &admin.DuplicationModifyRequest{ |
| AppName: tableName, |
| Dupid: int32(dupid), |
| Status: &status, |
| } |
| err := m.callMeta("ModifyDuplication", req, func(resp interface{}) {}) |
| return err |
| } |
| |
| func (m *rpcBasedMeta) AddDuplication(tableName string, remoteCluster string, duplicateCheckpoint bool) (*admin.DuplicationAddResponse, error) { |
| var result *admin.DuplicationAddResponse |
| req := &admin.DuplicationAddRequest{ |
| AppName: tableName, |
| RemoteClusterName: remoteCluster, |
| IsDuplicatingCheckpoint: duplicateCheckpoint, |
| } |
| err := m.callMeta("AddDuplication", req, func(resp interface{}) { |
| result = resp.(*admin.DuplicationAddResponse) |
| }) |
| if result != nil && result.IsSetHint() { |
| return result, wrapHintIntoError(*result.Hint, err) |
| } |
| return result, err |
| } |
| |
| func (m *rpcBasedMeta) QueryDuplication(tableName string) (*admin.DuplicationQueryResponse, error) { |
| var result *admin.DuplicationQueryResponse |
| req := &admin.DuplicationQueryRequest{ |
| AppName: tableName, |
| } |
| err := m.callMeta("QueryDuplication", req, func(resp interface{}) { |
| result = resp.(*admin.DuplicationQueryResponse) |
| }) |
| return result, err |
| } |
| |
| func (m *rpcBasedMeta) ListNodes() ([]*admin.NodeInfo, error) { |
| var result []*admin.NodeInfo |
| req := &admin.ListNodesRequest{ |
| Status: admin.NodeStatus_NS_INVALID, |
| } |
| err := m.callMeta("ListNodes", req, func(resp interface{}) { |
| result = resp.(*admin.ListNodesResponse).Infos |
| }) |
| return result, err |
| } |
| |
| func (m *rpcBasedMeta) RecallApp(originTableID int, newTableName string) (*admin.AppInfo, error) { |
| var result *admin.AppInfo |
| req := &admin.RecallAppRequest{ |
| AppID: int32(originTableID), |
| NewAppName_: newTableName, |
| } |
| err := m.callMeta("RecallApp", req, func(resp interface{}) { |
| result = resp.(*admin.RecallAppResponse).Info |
| }) |
| return result, err |
| } |
| |
| func getNodeAddress(n *util.PegasusNode) *base.RPCAddress { |
| if n == nil { |
| return &base.RPCAddress{} |
| } |
| return base.NewRPCAddress(n.IP, n.Port) |
| } |
| |
| func newProposalAction(target *util.PegasusNode, node *util.PegasusNode, cfgType admin.ConfigType) *admin.ConfigurationProposalAction { |
| return &admin.ConfigurationProposalAction{ |
| Target: getNodeAddress(target), |
| Node: getNodeAddress(node), |
| Type: cfgType, |
| } |
| } |
| |
| func (m *rpcBasedMeta) Balance(gpid *base.Gpid, opType BalanceType, from *util.PegasusNode, to *util.PegasusNode) error { |
| req := &admin.BalanceRequest{ |
| Gpid: gpid, |
| } |
| |
| var actions []*admin.ConfigurationProposalAction |
| switch opType { |
| case BalanceMovePri: |
| actions = append(actions, newProposalAction(from, from, admin.ConfigType_CT_DOWNGRADE_TO_SECONDARY)) |
| actions = append(actions, newProposalAction(to, to, admin.ConfigType_CT_UPGRADE_TO_PRIMARY)) |
| case BalanceCopyPri: |
| actions = append(actions, newProposalAction(from, to, admin.ConfigType_CT_ADD_SECONDARY_FOR_LB)) |
| actions = append(actions, newProposalAction(from, from, admin.ConfigType_CT_DOWNGRADE_TO_SECONDARY)) |
| actions = append(actions, newProposalAction(to, to, admin.ConfigType_CT_UPGRADE_TO_PRIMARY)) |
| actions = append(actions, newProposalAction(to, from, admin.ConfigType_CT_REMOVE)) |
| case BalanceCopySec: |
| actions = append(actions, newProposalAction(nil, to, admin.ConfigType_CT_ADD_SECONDARY_FOR_LB)) |
| actions = append(actions, newProposalAction(nil, from, admin.ConfigType_CT_DOWNGRADE_TO_INACTIVE)) |
| default: |
| return fmt.Errorf("illegal balance type %d", opType) |
| } |
| req.ActionList = actions |
| |
| err := m.callMeta("Balance", req, func(resp interface{}) {}) |
| return err |
| } |
| |
| func (m *rpcBasedMeta) Propose(gpid *base.Gpid, action admin.ConfigType, target *util.PegasusNode, node *util.PegasusNode) error { |
| req := &admin.BalanceRequest{ |
| Gpid: gpid, |
| ActionList: []*admin.ConfigurationProposalAction{ |
| newProposalAction(target, node, action), |
| }, |
| } |
| err := m.callMeta("Balance", req, func(resp interface{}) {}) |
| return err |
| } |
| |
| func (m *rpcBasedMeta) StartBackupApp(tableID int, providerType string, backupPath string) (*admin.StartBackupAppResponse, error) { |
| req := &admin.StartBackupAppRequest{ |
| BackupProviderType: providerType, |
| AppID: int32(tableID), |
| BackupPath: &backupPath, |
| } |
| var result *admin.StartBackupAppResponse |
| err := m.callMeta("StartBackupApp", req, func(resp interface{}) { |
| result = resp.(*admin.StartBackupAppResponse) |
| }) |
| return result, wrapHintIntoError(result.HintMessage, err) |
| } |
| |
| func (m *rpcBasedMeta) QueryBackupStatus(tableID int, backupID int64) (*admin.QueryBackupStatusResponse, error) { |
| var realBackupID *int64 |
| if backupID == 0 { |
| realBackupID = nil |
| } else { |
| realBackupID = &backupID |
| } |
| req := &admin.QueryBackupStatusRequest{ |
| AppID: int32(tableID), |
| BackupID: realBackupID, |
| } |
| var result *admin.QueryBackupStatusResponse |
| err := m.callMeta("QueryBackupStatus", req, func(resp interface{}) { |
| result = resp.(*admin.QueryBackupStatusResponse) |
| }) |
| return result, wrapHintIntoError(result.HintMessage, err) |
| } |
| |
| func (m *rpcBasedMeta) RestoreApp(oldClusterName string, oldTableName string, oldTableID int, |
| backupID int64, providerType string, newTableName string, restorePath string, |
| skipBadPartition bool, policyName string) (*admin.CreateAppResponse, error) { |
| req := &admin.RestoreAppRequest{ |
| ClusterName: oldClusterName, |
| PolicyName: policyName, |
| TimeStamp: backupID, |
| AppName: oldTableName, |
| AppID: int32(oldTableID), |
| NewAppName_: newTableName, |
| BackupProviderName: providerType, |
| SkipBadPartition: skipBadPartition, |
| RestorePath: &restorePath, |
| } |
| var result *admin.CreateAppResponse |
| SetRPCTimeout(time.Second * 20) |
| err := m.callMeta("RestoreApp", req, func(resp interface{}) { |
| result = resp.(*admin.CreateAppResponse) |
| }) |
| return result, err |
| } |
| |
| func (m *rpcBasedMeta) StartPartitionSplit(tableName string, newPartitionCount int) error { |
| req := &admin.StartPartitionSplitRequest{ |
| AppName: tableName, |
| NewPartitionCount_: int32(newPartitionCount), |
| } |
| var result *admin.StartPartitionSplitResponse |
| err := m.callMeta("StartPartitionSplit", req, func(resp interface{}) { |
| result = resp.(*admin.StartPartitionSplitResponse) |
| }) |
| return wrapHintIntoError(result.GetHintMsg(), err) |
| } |
| |
| func (m *rpcBasedMeta) QuerySplitStatus(tableName string) (*admin.QuerySplitResponse, error) { |
| req := &admin.QuerySplitRequest{ |
| AppName: tableName, |
| } |
| var result *admin.QuerySplitResponse |
| err := m.callMeta("QuerySplitStatus", req, func(resp interface{}) { |
| result = resp.(*admin.QuerySplitResponse) |
| }) |
| return result, wrapHintIntoError(result.GetHintMsg(), err) |
| } |
| |
| func (m *rpcBasedMeta) PausePartitionSplit(tableName string, parentPidx int) error { |
| req := &admin.ControlSplitRequest{ |
| AppName: tableName, |
| ControlType: admin.SplitControlType_PAUSE, |
| ParentPidx: int32(parentPidx), |
| } |
| var result *admin.ControlSplitResponse |
| err := m.callMeta("ControlPartitionSplit", req, func(resp interface{}) { |
| result = resp.(*admin.ControlSplitResponse) |
| }) |
| return wrapHintIntoError(result.GetHintMsg(), err) |
| } |
| |
| func (m *rpcBasedMeta) RestartPartitionSplit(tableName string, parentPidx int) error { |
| req := &admin.ControlSplitRequest{ |
| AppName: tableName, |
| ControlType: admin.SplitControlType_RESTART, |
| ParentPidx: int32(parentPidx), |
| } |
| var result *admin.ControlSplitResponse |
| err := m.callMeta("ControlPartitionSplit", req, func(resp interface{}) { |
| result = resp.(*admin.ControlSplitResponse) |
| }) |
| return wrapHintIntoError(result.GetHintMsg(), err) |
| } |
| |
| func (m *rpcBasedMeta) CancelPartitionSplit(tableName string, oldPartitionCount int) error { |
| var partitionCount int32 = int32(oldPartitionCount) |
| req := &admin.ControlSplitRequest{ |
| AppName: tableName, |
| ControlType: admin.SplitControlType_CANCEL, |
| ParentPidx: -1, |
| OldPartitionCount: &partitionCount, |
| } |
| var result *admin.ControlSplitResponse |
| err := m.callMeta("ControlPartitionSplit", req, func(resp interface{}) { |
| result = resp.(*admin.ControlSplitResponse) |
| }) |
| return wrapHintIntoError(result.GetHintMsg(), err) |
| } |
| |
| func (m *rpcBasedMeta) StartBulkLoad(tableName string, clusterName string, providerType string, rootPath string) error { |
| req := &admin.StartBulkLoadRequest{ |
| AppName: tableName, |
| ClusterName: clusterName, |
| FileProviderType: providerType, |
| RemoteRootPath: rootPath, |
| } |
| var result *admin.StartBulkLoadResponse |
| err := m.callMeta("StartBulkLoad", req, func(resp interface{}) { |
| result = resp.(*admin.StartBulkLoadResponse) |
| }) |
| return wrapHintIntoError(result.GetHintMsg(), err) |
| } |
| |
| func (m *rpcBasedMeta) QueryBulkLoad(tableName string) (*admin.QueryBulkLoadResponse, error) { |
| req := &admin.QueryBulkLoadRequest{ |
| AppName: tableName, |
| } |
| var result *admin.QueryBulkLoadResponse |
| err := m.callMeta("QueryBulkLoadStatus", req, func(resp interface{}) { |
| result = resp.(*admin.QueryBulkLoadResponse) |
| }) |
| return result, wrapHintIntoError(result.GetHintMsg(), err) |
| } |
| |
| func (m *rpcBasedMeta) PauseBulkLoad(tableName string) error { |
| req := &admin.ControlBulkLoadRequest{ |
| AppName: tableName, |
| Type: admin.BulkLoadControlType_BLC_PAUSE, |
| } |
| var result *admin.ControlBulkLoadResponse |
| err := m.callMeta("ControlBulkLoad", req, func(resp interface{}) { |
| result = resp.(*admin.ControlBulkLoadResponse) |
| }) |
| return wrapHintIntoError(result.GetHintMsg(), err) |
| } |
| |
| func (m *rpcBasedMeta) RestartBulkLoad(tableName string) error { |
| req := &admin.ControlBulkLoadRequest{ |
| AppName: tableName, |
| Type: admin.BulkLoadControlType_BLC_RESTART, |
| } |
| var result *admin.ControlBulkLoadResponse |
| err := m.callMeta("ControlBulkLoad", req, func(resp interface{}) { |
| result = resp.(*admin.ControlBulkLoadResponse) |
| }) |
| return wrapHintIntoError(result.GetHintMsg(), err) |
| } |
| |
| func (m *rpcBasedMeta) CancelBulkLoad(tableName string, forced bool) error { |
| cancelType := admin.BulkLoadControlType_BLC_CANCEL |
| if forced { |
| cancelType = admin.BulkLoadControlType_BLC_FORCE_CANCEL |
| } |
| req := &admin.ControlBulkLoadRequest{ |
| AppName: tableName, |
| Type: cancelType, |
| } |
| var result *admin.ControlBulkLoadResponse |
| err := m.callMeta("ControlBulkLoad", req, func(resp interface{}) { |
| result = resp.(*admin.ControlBulkLoadResponse) |
| }) |
| return wrapHintIntoError(result.GetHintMsg(), err) |
| } |
| |
| func (m *rpcBasedMeta) ClearBulkLoad(tableName string) error { |
| req := &admin.ClearBulkLoadStateRequest{ |
| AppName: tableName, |
| } |
| var result *admin.ClearBulkLoadStateResponse |
| err := m.callMeta("ClearBulkLoad", req, func(resp interface{}) { |
| result = resp.(*admin.ClearBulkLoadStateResponse) |
| }) |
| return wrapHintIntoError(result.GetHintMsg(), err) |
| } |
| |
| func (m *rpcBasedMeta) StartManualCompaction(tableName string, targetLevel int, maxRunningCount int, bottommost bool) error { |
| var level int32 = int32(targetLevel) |
| var count int32 = int32(maxRunningCount) |
| req := &admin.StartAppManualCompactRequest{ |
| AppName: tableName, |
| TargetLevel: &level, |
| Bottommost: &bottommost, |
| MaxRunningCount: &count, |
| } |
| var result *admin.StartAppManualCompactResponse |
| err := m.callMeta("StartManualCompact", req, func(resp interface{}) { |
| result = resp.(*admin.StartAppManualCompactResponse) |
| }) |
| return wrapHintIntoError(result.GetHintMsg(), err) |
| } |
| |
| func (m *rpcBasedMeta) QueryManualCompaction(tableName string) (*admin.QueryAppManualCompactResponse, error) { |
| req := &admin.QueryAppManualCompactRequest{ |
| AppName: tableName, |
| } |
| var result *admin.QueryAppManualCompactResponse |
| err := m.callMeta("QueryManualCompact", req, func(resp interface{}) { |
| result = resp.(*admin.QueryAppManualCompactResponse) |
| }) |
| return result, wrapHintIntoError(result.GetHintMsg(), err) |
| } |