feat(topic transfer): add CRD and controller for topic transfer
diff --git a/deploy/crds/rocketmq_v1alpha1_topictransfer_crd.yaml b/deploy/crds/rocketmq_v1alpha1_topictransfer_crd.yaml
new file mode 100644
index 0000000..7c3a4f7
--- /dev/null
+++ b/deploy/crds/rocketmq_v1alpha1_topictransfer_crd.yaml
@@ -0,0 +1,51 @@
+apiVersion: apiextensions.k8s.io/v1beta1
+kind: CustomResourceDefinition
+metadata:
+ name: topictransfers.rocketmq.apache.org
+spec:
+ group: rocketmq.apache.org
+ names:
+ kind: TopicTransfer
+ listKind: TopicTransferList
+ plural: topictransfers
+ singular: topictransfer
+ scope: Namespaced
+ subresources:
+ status: {}
+ validation:
+ openAPIV3Schema:
+ properties:
+ apiVersion:
+ description: 'APIVersion defines the versioned schema of this representation
+ of an object. Servers should convert recognized schemas to the latest
+ internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources'
+ type: string
+ kind:
+ description: 'Kind is a string value representing the REST resource this
+ object represents. Servers may infer this from the endpoint the client
+ submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds'
+ type: string
+ metadata:
+ type: object
+ spec:
+ properties:
+ consumerGroup:
+ description: Consumer group
+ type: string
+ sourceCluster:
+ description: The cluster where the transferred topic from
+ type: string
+ targetCluster:
+ description: The cluster where the topic will be transferred to
+ type: string
+ topic:
+ description: Topic name
+ type: string
+ type: object
+ status:
+ type: object
+ version: v1alpha1
+ versions:
+ - name: v1alpha1
+ served: true
+ storage: true
diff --git a/deploy/role.yaml b/deploy/role.yaml
index 0cfc34c..a27f75f 100644
--- a/deploy/role.yaml
+++ b/deploy/role.yaml
@@ -53,5 +53,6 @@
- '*'
- brokers
- pods/exec
+ - topictransfers
verbs:
- '*'
diff --git a/example/rocketmq_v1alpha1_topictransfer_cr.yaml b/example/rocketmq_v1alpha1_topictransfer_cr.yaml
new file mode 100644
index 0000000..4d05b34
--- /dev/null
+++ b/example/rocketmq_v1alpha1_topictransfer_cr.yaml
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apiVersion: rocketmq.apache.org/v1alpha1
+kind: TopicTransfer
+metadata:
+ name: topictransfer
+spec:
+ # topic defines which topic to be transferred
+ topic: TopicTest
+ # consumerGroup defines the consumer group
+ consumerGroup: please_rename_unique_group_name_4
+ # sourceCluster define the source cluster
+ sourceCluster: broker
+ # targetCluster defines the target cluster
+ targetCluster: targetbroker
diff --git a/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go b/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go
new file mode 100644
index 0000000..fbcb2f3
--- /dev/null
+++ b/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package v1alpha1
+
+import (
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
+// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
+
+// TopicTransferSpec defines the desired state of TopicTransfer
+// +k8s:openapi-gen=true
+type TopicTransferSpec struct {
+ // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
+ // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file
+ // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html
+
+ // Topic name
+ Topic string `json:"topic,omitempty"`
+ // Consumer group
+ ConsumerGroup string `json:"consumerGroup,omitempty"`
+ // The cluster where the transferred topic from
+ SourceCluster string `json:"sourceCluster,omitempty"`
+ // The cluster where the topic will be transferred to
+ TargetCluster string `json:"targetCluster,omitempty"`
+}
+
+// TopicTransferStatus defines the observed state of TopicTransfer
+// +k8s:openapi-gen=true
+type TopicTransferStatus struct {
+ // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
+ // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file
+ // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html
+}
+
+// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
+
+// TopicTransfer is the Schema for the topictransfers API
+// +k8s:openapi-gen=true
+// +kubebuilder:subresource:status
+type TopicTransfer struct {
+ metav1.TypeMeta `json:",inline"`
+ metav1.ObjectMeta `json:"metadata,omitempty"`
+
+ Spec TopicTransferSpec `json:"spec,omitempty"`
+ Status TopicTransferStatus `json:"status,omitempty"`
+}
+
+// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
+
+// TopicTransferList contains a list of TopicTransfer
+type TopicTransferList struct {
+ metav1.TypeMeta `json:",inline"`
+ metav1.ListMeta `json:"metadata,omitempty"`
+ Items []TopicTransfer `json:"items"`
+}
+
+func init() {
+ SchemeBuilder.Register(&TopicTransfer{}, &TopicTransferList{})
+}
diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go
index f6ed7a4..ee349ed 100644
--- a/pkg/constants/constants.go
+++ b/pkg/constants/constants.go
@@ -49,4 +49,5 @@
const RestartBrokerPodIntervalInSecond = 30
const MinMetadataJsonFileSize = 5
+const MinIpListLength = 8
diff --git a/pkg/controller/add_topictransfer.go b/pkg/controller/add_topictransfer.go
new file mode 100644
index 0000000..8d9bbc3
--- /dev/null
+++ b/pkg/controller/add_topictransfer.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controller
+
+import (
+ "github.com/apache/rocketmq-operator/pkg/controller/topictransfer"
+)
+
+func init() {
+ // AddToManagerFuncs is a list of functions to create controllers and add them to a manager.
+ AddToManagerFuncs = append(AddToManagerFuncs, topictransfer.Add)
+}
diff --git a/pkg/controller/broker/broker_controller.go b/pkg/controller/broker/broker_controller.go
index aabe517..75ee44e 100644
--- a/pkg/controller/broker/broker_controller.go
+++ b/pkg/controller/broker/broker_controller.go
@@ -260,7 +260,6 @@
log.Info("subscriptionGroupCommand: " + subscriptionGroupCommand)
MakeConfigDirCommand := "mkdir -p " + cons.StoreConfigDir
ChmodDirCommand := "chmod a+rw " + cons.StoreConfigDir
- log.Info("ChmodDirCommand: " + ChmodDirCommand)
cmd = []string{"/bin/bash", "-c", MakeConfigDirCommand + " && " + ChmodDirCommand + " && " + topicsCommand + " && " + subscriptionGroupCommand}
}
}
diff --git a/pkg/controller/nameservice/nameservice_controller.go b/pkg/controller/nameservice/nameservice_controller.go
index 6c0db47..8385350 100644
--- a/pkg/controller/nameservice/nameservice_controller.go
+++ b/pkg/controller/nameservice/nameservice_controller.go
@@ -188,9 +188,9 @@
share.NameServersStr = nameServerListStr[:len(nameServerListStr)-1]
reqLogger.Info("share.NameServersStr:" + share.NameServersStr)
- if len(oldNameServerListStr) < 8 {
+ if len(oldNameServerListStr) <= cons.MinIpListLength {
oldNameServerListStr = share.NameServersStr
- } else if len(share.NameServersStr) > 8 {
+ } else if len(share.NameServersStr) > cons.MinIpListLength {
oldNameServerListStr = oldNameServerListStr[:len(oldNameServerListStr)-1]
share.IsNameServersStrUpdated = true
}
@@ -206,7 +206,7 @@
}
// use admin tool to update broker config
- if share.IsNameServersStrUpdated && (len(oldNameServerListStr) > 8) && (len(share.NameServersStr) > 8) {
+ if share.IsNameServersStrUpdated && (len(oldNameServerListStr) > cons.MinIpListLength) && (len(share.NameServersStr) > cons.MinIpListLength) {
mqAdmin := cons.AdminToolDir
subCmd := cons.UpdateBrokerConfig
key := cons.ParamNameServiceAddress
diff --git a/pkg/controller/topictransfer/topictransfer_controller.go b/pkg/controller/topictransfer/topictransfer_controller.go
new file mode 100644
index 0000000..33c4ea6
--- /dev/null
+++ b/pkg/controller/topictransfer/topictransfer_controller.go
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package topictransfer
+
+import (
+ "context"
+ "os/exec"
+ "strings"
+ "time"
+
+ rocketmqv1alpha1 "github.com/apache/rocketmq-operator/pkg/apis/rocketmq/v1alpha1"
+ cons "github.com/apache/rocketmq-operator/pkg/constants"
+ "github.com/apache/rocketmq-operator/pkg/share"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/runtime"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
+ "sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+var log = logf.Log.WithName("controller_topictransfer")
+
+/**
+* USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller
+* business logic. Delete these comments after modifying this file.*
+ */
+
+// Add creates a new TopicTransfer Controller and adds it to the Manager. The Manager will set fields on the Controller
+// and Start it when the Manager is Started.
+func Add(mgr manager.Manager) error {
+ return add(mgr, newReconciler(mgr))
+}
+
+// newReconciler returns a new reconcile.Reconciler
+func newReconciler(mgr manager.Manager) reconcile.Reconciler {
+ return &ReconcileTopicTransfer{client: mgr.GetClient(), scheme: mgr.GetScheme()}
+}
+
+// add adds a new Controller to mgr with r as the reconcile.Reconciler
+func add(mgr manager.Manager, r reconcile.Reconciler) error {
+ // Create a new controller
+ c, err := controller.New("topictransfer-controller", mgr, controller.Options{Reconciler: r})
+ if err != nil {
+ return err
+ }
+
+ // Watch for changes to primary resource TopicTransfer
+ err = c.Watch(&source.Kind{Type: &rocketmqv1alpha1.TopicTransfer{}}, &handler.EnqueueRequestForObject{})
+ if err != nil {
+ return err
+ }
+
+ // TODO(user): Modify this to be the types you create that are owned by the primary resource
+ // Watch for changes to secondary resource Pods and requeue the owner TopicTransfer
+ err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
+ IsController: true,
+ OwnerType: &rocketmqv1alpha1.TopicTransfer{},
+ })
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// blank assignment to verify that ReconcileTopicTransfer implements reconcile.Reconciler
+var _ reconcile.Reconciler = &ReconcileTopicTransfer{}
+
+// ReconcileTopicTransfer reconciles a TopicTransfer object
+type ReconcileTopicTransfer struct {
+ // This client, initialized using mgr.Client() above, is a split client
+ // that reads objects from the cache and writes to the apiserver
+ client client.Client
+ scheme *runtime.Scheme
+}
+
+// Reconcile reads that state of the cluster for a TopicTransfer object and makes changes based on the state read
+// and what is in the TopicTransfer.Spec
+// TODO(user): Modify this Reconcile function to implement your Controller logic. This example creates
+// a Pod as an example
+// Note:
+// The Controller will requeue the Request to be processed again if the returned error is non-nil or
+// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
+func (r *ReconcileTopicTransfer) Reconcile(request reconcile.Request) (reconcile.Result, error) {
+ reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
+ reqLogger.Info("Reconciling TopicTransfer")
+
+ // Fetch the TopicTransfer topicTransfer
+ topicTransfer := &rocketmqv1alpha1.TopicTransfer{}
+ err := r.client.Get(context.TODO(), request.NamespacedName, topicTransfer)
+ if err != nil {
+ if errors.IsNotFound(err) {
+ // Request object not found, could have been deleted after reconcile request.
+ // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
+ // Return and don't requeue
+ return reconcile.Result{}, nil
+ }
+ // Error reading the object - requeue the request.
+ return reconcile.Result{}, err
+ }
+
+ // TODO: if ConsumerGroup could be decided by listing the topics
+
+ // TODO: what if current name server is compromised
+ // step1: add consumer group to target cluster
+ nameServer := strings.Split(share.NameServersStr, ";")[0]
+ consumerGroup := topicTransfer.Spec.ConsumerGroup
+ targetCluster := topicTransfer.Spec.TargetCluster
+ sourceCluster := topicTransfer.Spec.SourceCluster
+ topic := topicTransfer.Spec.Topic
+
+ addConsumerGroupToTargetCluster := buildAddConsumerGroupToClusterCommand(consumerGroup, targetCluster, nameServer)
+ addConsumerGroupToTargetClusterCommand := strings.Join(addConsumerGroupToTargetCluster, " ")
+ reqLogger.Info("AddConsumerGroupToTargetClusterCommand: " + addConsumerGroupToTargetClusterCommand)
+ cmd := exec.Command("sh", addConsumerGroupToTargetClusterCommand)
+ output, err := cmd.Output()
+ if err != nil {
+ reqLogger.Error(err, "Failed to add ConsumerGroup " + consumerGroup + " to TargetCluster " + targetCluster)
+ return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+ }
+ // isAddConsumerGroupToTargetClusterOutputValid(string(output))
+ reqLogger.Info("Successfully add ConsumerGroup " + consumerGroup + " to TargetCluster " + targetCluster + " with output: " + string(output))
+
+ // step2: add consumer group to target cluster
+ addTopicToTargetCluster := buildAddTopicToClusterCommand(topic, targetCluster, nameServer)
+ addTopicToTargetClusterCommand := strings.Join(addTopicToTargetCluster, " ")
+ reqLogger.Info("addTopicToTargetClusterCommand: " + addTopicToTargetClusterCommand)
+ cmd = exec.Command("sh", addTopicToTargetClusterCommand)
+ output, err = cmd.Output()
+ if err != nil {
+ reqLogger.Error(err, "Failed to add Topic " + topic + " to TargetCluster " + targetCluster)
+ return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+ }
+ // TODO: verify output of addTopicToTargetClusterCommand
+ reqLogger.Info("Successfully add Topic " + topic + " to TargetCluster " + targetCluster + " with output: " + string(output))
+
+ // step3: stop write in source cluster topic
+ stopSourceClusterTopicWrite := buildStopClusterTopicWriteCommand(topic, sourceCluster, nameServer)
+ stopSourceClusterTopicWriteCommand := strings.Join(stopSourceClusterTopicWrite, " ")
+ reqLogger.Info("stopSourceClusterTopicWriteCommand: " + stopSourceClusterTopicWriteCommand)
+ cmd = exec.Command("sh", stopSourceClusterTopicWriteCommand)
+ output, err = cmd.Output()
+ if err != nil {
+ reqLogger.Error(err, "Failed to stop Topic " + topic + " write in SourceCluster " + sourceCluster)
+ return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+ }
+ // TODO: verify output of stopSourceClusterTopicWriteCommand
+ reqLogger.Info("Successfully stop Topic " + topic + " write in SourceCluster " + sourceCluster + " with output: " + string(output))
+
+ // step4: check source cluster unconsumed message
+ for {
+ checkConsumeProgress := buildCheckConsumeProgressCommand(consumerGroup, nameServer)
+ checkConsumeProgressCommand := strings.Join(checkConsumeProgress, " ")
+ reqLogger.Info("checkConsumeProgressCommand: " + checkConsumeProgressCommand)
+ cmd = exec.Command("sh", checkConsumeProgressCommand)
+ output, err = cmd.Output()
+ if err != nil {
+ reqLogger.Error(err, "Failed to check consumerGroup " + consumerGroup)
+ return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+ }
+ var finished bool = isConsumeFinished(string(output))
+ reqLogger.Info(" output: " + string(output))
+ if finished {
+ break
+ }
+ time.Sleep(time.Duration(5) * time.Second)
+ }
+
+ // step5: delete topic in source cluster
+ deleteSourceClusterTopic := buildDeleteSourceClusterTopicCommand(topic, sourceCluster, nameServer)
+ deleteSourceClusterTopicCommand := strings.Join(deleteSourceClusterTopic, " ")
+ reqLogger.Info("deleteSourceClusterTopicCommand: " + deleteSourceClusterTopicCommand)
+ cmd = exec.Command("sh", deleteSourceClusterTopicCommand)
+ output, err = cmd.Output()
+ if err != nil {
+ reqLogger.Error(err, "Failed to delete Topic " + topic + " in SourceCluster " + sourceCluster)
+ return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+ }
+ // TODO: verify output
+ reqLogger.Info("Successfully delete Topic " + topic + " in SourceCluster " + sourceCluster + " with output: " + string(output))
+
+ // step6: delete consumer group in source cluster
+ deleteConsumerGroup := buildDeleteConsumeGroupCommand(consumerGroup, sourceCluster, nameServer)
+ deleteConsumerGroupCommand := strings.Join(deleteConsumerGroup, " ")
+ reqLogger.Info("deleteConsumerGroupCommand: " + deleteConsumerGroupCommand)
+ cmd = exec.Command("sh", deleteConsumerGroupCommand)
+ output, err = cmd.Output()
+ if err != nil {
+ reqLogger.Error(err, "Failed to delete consumer group " + consumerGroup + " in SourceCluster " + sourceCluster)
+ return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+ }
+ // TODO: verify output
+ reqLogger.Info("Successfully delete consumer group " + consumerGroup + " in SourceCluster " + sourceCluster + " with output: " + string(output))
+
+ // step7: create retry topic
+ createRetryTopic := buildAddRetryTopicToClusterCommand(consumerGroup, targetCluster, nameServer)
+ createRetryTopicCommand := strings.Join(createRetryTopic, " ")
+ reqLogger.Info("createRetryTopicCommand: " + createRetryTopicCommand)
+ cmd = exec.Command("sh", createRetryTopicCommand)
+ output, err = cmd.Output()
+ if err != nil {
+ reqLogger.Error(err, "Failed to create retry topic of consumer group " + consumerGroup + " in TargetCluster " + targetCluster)
+ return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+ }
+ // TODO: verify output
+ reqLogger.Info("Successfully create retry topic of consumer group " + consumerGroup + " in TargetCluster " + targetCluster + " with output: " + string(output))
+
+ reqLogger.Info("Topic " + topic + " has been transferred from " + sourceCluster + " to " + targetCluster)
+ return reconcile.Result{}, nil
+}
+
+func buildAddRetryTopicToClusterCommand(consumerGroup string, cluster string, nameServer string) []string {
+ cmdOpts := []string{
+ cons.AdminToolDir,
+ "updatetopic",
+ "-t",
+ "%RETRY%" + consumerGroup,
+ "-c",
+ cluster,
+ "-r",
+ "1",
+ "-w",
+ "1",
+ "-p",
+ "6",
+ "-n",
+ nameServer,
+ }
+ return cmdOpts
+}
+
+func isConsumeFinished(output string) bool {
+ return true
+}
+
+func buildDeleteConsumeGroupCommand(consumerGroup string, cluster string, nameServer string) []string {
+ cmdOpts := []string{
+ cons.AdminToolDir,
+ "deleteSubGroup",
+ "-g",
+ consumerGroup,
+ "-c",
+ cluster,
+ "-n",
+ nameServer,
+ }
+ return cmdOpts
+}
+
+func buildDeleteSourceClusterTopicCommand(topic string, sourceCluster string, nameServer string) []string {
+ cmdOpts := []string{
+ cons.AdminToolDir,
+ "deletetopic",
+ "-t",
+ topic,
+ "-c",
+ sourceCluster,
+ "-n",
+ nameServer,
+ }
+ return cmdOpts
+}
+
+func isAddConsumerGroupToTargetClusterOutputValid(s string) {
+
+}
+
+func buildCheckConsumeProgressCommand(consumerGroup string, nameServer string) []string {
+ cmdOpts := []string{
+ cons.AdminToolDir,
+ "consumerprogress",
+ "-g",
+ consumerGroup,
+ "-n",
+ nameServer,
+ }
+ return cmdOpts
+}
+
+func buildStopClusterTopicWriteCommand(topic string, cluster string, nameServer string) []string {
+ cmdOpts := []string{
+ cons.AdminToolDir,
+ "updatetopic",
+ "-t",
+ topic,
+ "-c",
+ cluster,
+ "-p",
+ "4",
+ "-w",
+ "0",
+ "-n",
+ nameServer,
+ }
+ return cmdOpts
+}
+
+func buildAddConsumerGroupToClusterCommand(consumerGroup string, cluster string, nameServer string) []string {
+ cmdOpts := []string{
+ cons.AdminToolDir,
+ "updatesubgroup",
+ "-g",
+ consumerGroup,
+ "-c",
+ cluster,
+ "-m",
+ "true",
+ "-d",
+ "true",
+ "-n",
+ nameServer,
+ }
+ return cmdOpts
+}
+
+func buildAddTopicToClusterCommand(topic string, cluster string, nameServer string) []string {
+ cmdOpts := []string{
+ cons.AdminToolDir,
+ "updatetopic",
+ "-t",
+ topic,
+ "-c",
+ cluster,
+ "-n",
+ nameServer,
+ }
+ return cmdOpts
+}