feat(topic transfer): add CRD and controller for topic transfer
diff --git a/deploy/crds/rocketmq_v1alpha1_topictransfer_crd.yaml b/deploy/crds/rocketmq_v1alpha1_topictransfer_crd.yaml
new file mode 100644
index 0000000..7c3a4f7
--- /dev/null
+++ b/deploy/crds/rocketmq_v1alpha1_topictransfer_crd.yaml
@@ -0,0 +1,51 @@
+apiVersion: apiextensions.k8s.io/v1beta1
+kind: CustomResourceDefinition
+metadata:
+  name: topictransfers.rocketmq.apache.org
+spec:
+  group: rocketmq.apache.org
+  names:
+    kind: TopicTransfer
+    listKind: TopicTransferList
+    plural: topictransfers
+    singular: topictransfer
+  scope: Namespaced
+  subresources:
+    status: {}
+  validation:
+    openAPIV3Schema:
+      properties:
+        apiVersion:
+          description: 'APIVersion defines the versioned schema of this representation
+            of an object. Servers should convert recognized schemas to the latest
+            internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources'
+          type: string
+        kind:
+          description: 'Kind is a string value representing the REST resource this
+            object represents. Servers may infer this from the endpoint the client
+            submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds'
+          type: string
+        metadata:
+          type: object
+        spec:
+          properties:
+            consumerGroup:
+              description: Consumer group
+              type: string
+            sourceCluster:
+              description: The cluster where the transferred topic from
+              type: string
+            targetCluster:
+              description: The cluster where the topic will be transferred to
+              type: string
+            topic:
+              description: Topic name
+              type: string
+          type: object
+        status:
+          type: object
+  version: v1alpha1
+  versions:
+  - name: v1alpha1
+    served: true
+    storage: true
diff --git a/deploy/role.yaml b/deploy/role.yaml
index 0cfc34c..a27f75f 100644
--- a/deploy/role.yaml
+++ b/deploy/role.yaml
@@ -53,5 +53,6 @@
   - '*'
   - brokers
   - pods/exec
+  - topictransfers
   verbs:
   - '*'
diff --git a/example/rocketmq_v1alpha1_topictransfer_cr.yaml b/example/rocketmq_v1alpha1_topictransfer_cr.yaml
new file mode 100644
index 0000000..4d05b34
--- /dev/null
+++ b/example/rocketmq_v1alpha1_topictransfer_cr.yaml
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apiVersion: rocketmq.apache.org/v1alpha1
+kind: TopicTransfer
+metadata:
+  name: topictransfer
+spec:
+  # topic defines which topic to be transferred
+  topic: TopicTest
+  # consumerGroup defines the consumer group
+  consumerGroup: please_rename_unique_group_name_4
+  # sourceCluster define the source cluster
+  sourceCluster: broker
+  # targetCluster defines the target cluster
+  targetCluster: targetbroker
diff --git a/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go b/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go
new file mode 100644
index 0000000..fbcb2f3
--- /dev/null
+++ b/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package v1alpha1
+
+import (
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
+// NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.
+
+// TopicTransferSpec defines the desired state of TopicTransfer
+// +k8s:openapi-gen=true
+type TopicTransferSpec struct {
+	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
+	// Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file
+	// Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html
+
+	// Topic name
+	Topic string `json:"topic,omitempty"`
+	// Consumer group
+	ConsumerGroup string `json:"consumerGroup,omitempty"`
+	// The cluster where the transferred topic from
+	SourceCluster string `json:"sourceCluster,omitempty"`
+	// The cluster where the topic will be transferred to
+	TargetCluster string `json:"targetCluster,omitempty"`
+}
+
+// TopicTransferStatus defines the observed state of TopicTransfer
+// +k8s:openapi-gen=true
+type TopicTransferStatus struct {
+	// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
+	// Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file
+	// Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html
+}
+
+// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
+
+// TopicTransfer is the Schema for the topictransfers API
+// +k8s:openapi-gen=true
+// +kubebuilder:subresource:status
+type TopicTransfer struct {
+	metav1.TypeMeta   `json:",inline"`
+	metav1.ObjectMeta `json:"metadata,omitempty"`
+
+	Spec   TopicTransferSpec   `json:"spec,omitempty"`
+	Status TopicTransferStatus `json:"status,omitempty"`
+}
+
+// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
+
+// TopicTransferList contains a list of TopicTransfer
+type TopicTransferList struct {
+	metav1.TypeMeta `json:",inline"`
+	metav1.ListMeta `json:"metadata,omitempty"`
+	Items           []TopicTransfer `json:"items"`
+}
+
+func init() {
+	SchemeBuilder.Register(&TopicTransfer{}, &TopicTransferList{})
+}
diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go
index f6ed7a4..ee349ed 100644
--- a/pkg/constants/constants.go
+++ b/pkg/constants/constants.go
@@ -49,4 +49,5 @@
 
 const RestartBrokerPodIntervalInSecond  = 30
 const MinMetadataJsonFileSize  = 5
+const MinIpListLength  = 8
 
diff --git a/pkg/controller/add_topictransfer.go b/pkg/controller/add_topictransfer.go
new file mode 100644
index 0000000..8d9bbc3
--- /dev/null
+++ b/pkg/controller/add_topictransfer.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controller
+
+import (
+	"github.com/apache/rocketmq-operator/pkg/controller/topictransfer"
+)
+
+func init() {
+	// AddToManagerFuncs is a list of functions to create controllers and add them to a manager.
+	AddToManagerFuncs = append(AddToManagerFuncs, topictransfer.Add)
+}
diff --git a/pkg/controller/broker/broker_controller.go b/pkg/controller/broker/broker_controller.go
index aabe517..75ee44e 100644
--- a/pkg/controller/broker/broker_controller.go
+++ b/pkg/controller/broker/broker_controller.go
@@ -260,7 +260,6 @@
 			log.Info("subscriptionGroupCommand: " + subscriptionGroupCommand)
 			MakeConfigDirCommand := "mkdir -p " + cons.StoreConfigDir
 			ChmodDirCommand := "chmod a+rw " + cons.StoreConfigDir
-			log.Info("ChmodDirCommand: " + ChmodDirCommand)
 			cmd = []string{"/bin/bash", "-c", MakeConfigDirCommand + " && " + ChmodDirCommand + " && " + topicsCommand + " && " + subscriptionGroupCommand}
 		}
 	}
diff --git a/pkg/controller/nameservice/nameservice_controller.go b/pkg/controller/nameservice/nameservice_controller.go
index 6c0db47..8385350 100644
--- a/pkg/controller/nameservice/nameservice_controller.go
+++ b/pkg/controller/nameservice/nameservice_controller.go
@@ -188,9 +188,9 @@
 		share.NameServersStr = nameServerListStr[:len(nameServerListStr)-1]
 		reqLogger.Info("share.NameServersStr:" + share.NameServersStr)
 
-		if len(oldNameServerListStr) < 8 {
+		if len(oldNameServerListStr) <= cons.MinIpListLength {
 			oldNameServerListStr = share.NameServersStr
-		} else if len(share.NameServersStr) > 8 {
+		} else if len(share.NameServersStr) > cons.MinIpListLength {
 			oldNameServerListStr = oldNameServerListStr[:len(oldNameServerListStr)-1]
 			share.IsNameServersStrUpdated = true
 		}
@@ -206,7 +206,7 @@
 		}
 
 		// use admin tool to update broker config
-		if share.IsNameServersStrUpdated && (len(oldNameServerListStr) > 8) && (len(share.NameServersStr) > 8) {
+		if share.IsNameServersStrUpdated && (len(oldNameServerListStr) > cons.MinIpListLength) && (len(share.NameServersStr) > cons.MinIpListLength) {
 			mqAdmin := cons.AdminToolDir
 			subCmd := cons.UpdateBrokerConfig
 			key := cons.ParamNameServiceAddress
diff --git a/pkg/controller/topictransfer/topictransfer_controller.go b/pkg/controller/topictransfer/topictransfer_controller.go
new file mode 100644
index 0000000..33c4ea6
--- /dev/null
+++ b/pkg/controller/topictransfer/topictransfer_controller.go
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package topictransfer
+
+import (
+	"context"
+	"os/exec"
+	"strings"
+	"time"
+
+	rocketmqv1alpha1 "github.com/apache/rocketmq-operator/pkg/apis/rocketmq/v1alpha1"
+	cons "github.com/apache/rocketmq-operator/pkg/constants"
+	"github.com/apache/rocketmq-operator/pkg/share"
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/errors"
+	"k8s.io/apimachinery/pkg/runtime"
+	"sigs.k8s.io/controller-runtime/pkg/client"
+	"sigs.k8s.io/controller-runtime/pkg/controller"
+	"sigs.k8s.io/controller-runtime/pkg/handler"
+	"sigs.k8s.io/controller-runtime/pkg/manager"
+	"sigs.k8s.io/controller-runtime/pkg/reconcile"
+	logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
+	"sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+var log = logf.Log.WithName("controller_topictransfer")
+
+/**
+* USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller
+* business logic.  Delete these comments after modifying this file.*
+ */
+
+// Add creates a new TopicTransfer Controller and adds it to the Manager. The Manager will set fields on the Controller
+// and Start it when the Manager is Started.
+func Add(mgr manager.Manager) error {
+	return add(mgr, newReconciler(mgr))
+}
+
+// newReconciler returns a new reconcile.Reconciler
+func newReconciler(mgr manager.Manager) reconcile.Reconciler {
+	return &ReconcileTopicTransfer{client: mgr.GetClient(), scheme: mgr.GetScheme()}
+}
+
+// add adds a new Controller to mgr with r as the reconcile.Reconciler
+func add(mgr manager.Manager, r reconcile.Reconciler) error {
+	// Create a new controller
+	c, err := controller.New("topictransfer-controller", mgr, controller.Options{Reconciler: r})
+	if err != nil {
+		return err
+	}
+
+	// Watch for changes to primary resource TopicTransfer
+	err = c.Watch(&source.Kind{Type: &rocketmqv1alpha1.TopicTransfer{}}, &handler.EnqueueRequestForObject{})
+	if err != nil {
+		return err
+	}
+
+	// TODO(user): Modify this to be the types you create that are owned by the primary resource
+	// Watch for changes to secondary resource Pods and requeue the owner TopicTransfer
+	err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
+		IsController: true,
+		OwnerType:    &rocketmqv1alpha1.TopicTransfer{},
+	})
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// blank assignment to verify that ReconcileTopicTransfer implements reconcile.Reconciler
+var _ reconcile.Reconciler = &ReconcileTopicTransfer{}
+
+// ReconcileTopicTransfer reconciles a TopicTransfer object
+type ReconcileTopicTransfer struct {
+	// This client, initialized using mgr.Client() above, is a split client
+	// that reads objects from the cache and writes to the apiserver
+	client client.Client
+	scheme *runtime.Scheme
+}
+
+// Reconcile reads that state of the cluster for a TopicTransfer object and makes changes based on the state read
+// and what is in the TopicTransfer.Spec
+// TODO(user): Modify this Reconcile function to implement your Controller logic.  This example creates
+// a Pod as an example
+// Note:
+// The Controller will requeue the Request to be processed again if the returned error is non-nil or
+// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
+func (r *ReconcileTopicTransfer) Reconcile(request reconcile.Request) (reconcile.Result, error) {
+	reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
+	reqLogger.Info("Reconciling TopicTransfer")
+
+	// Fetch the TopicTransfer topicTransfer
+	topicTransfer := &rocketmqv1alpha1.TopicTransfer{}
+	err := r.client.Get(context.TODO(), request.NamespacedName, topicTransfer)
+	if err != nil {
+		if errors.IsNotFound(err) {
+			// Request object not found, could have been deleted after reconcile request.
+			// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
+			// Return and don't requeue
+			return reconcile.Result{}, nil
+		}
+		// Error reading the object - requeue the request.
+		return reconcile.Result{}, err
+	}
+
+	// TODO: if ConsumerGroup could be decided by listing the topics
+
+	// TODO: what if current name server is compromised
+	// step1: add consumer group to target cluster
+	nameServer := strings.Split(share.NameServersStr, ";")[0]
+	consumerGroup := topicTransfer.Spec.ConsumerGroup
+	targetCluster := topicTransfer.Spec.TargetCluster
+	sourceCluster := topicTransfer.Spec.SourceCluster
+	topic := topicTransfer.Spec.Topic
+
+	addConsumerGroupToTargetCluster := buildAddConsumerGroupToClusterCommand(consumerGroup, targetCluster, nameServer)
+	addConsumerGroupToTargetClusterCommand := strings.Join(addConsumerGroupToTargetCluster, " ")
+	reqLogger.Info("AddConsumerGroupToTargetClusterCommand: " + addConsumerGroupToTargetClusterCommand)
+	cmd := exec.Command("sh", addConsumerGroupToTargetClusterCommand)
+	output, err := cmd.Output()
+	if err != nil {
+		reqLogger.Error(err, "Failed to add ConsumerGroup " + consumerGroup + " to TargetCluster " + targetCluster)
+		return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+	}
+	// isAddConsumerGroupToTargetClusterOutputValid(string(output))
+	reqLogger.Info("Successfully add ConsumerGroup " + consumerGroup + " to TargetCluster " + targetCluster + " with output: " + string(output))
+
+	// step2: add consumer group to target cluster
+	addTopicToTargetCluster := buildAddTopicToClusterCommand(topic, targetCluster, nameServer)
+	addTopicToTargetClusterCommand := strings.Join(addTopicToTargetCluster, " ")
+	reqLogger.Info("addTopicToTargetClusterCommand: " + addTopicToTargetClusterCommand)
+	cmd = exec.Command("sh", addTopicToTargetClusterCommand)
+	output, err = cmd.Output()
+	if err != nil {
+		reqLogger.Error(err, "Failed to add Topic " + topic + " to TargetCluster " + targetCluster)
+		return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+	}
+	// TODO: verify output of addTopicToTargetClusterCommand
+	reqLogger.Info("Successfully add Topic " + topic + " to TargetCluster " + targetCluster + " with output: " + string(output))
+
+	// step3: stop write in source cluster topic
+	stopSourceClusterTopicWrite := buildStopClusterTopicWriteCommand(topic, sourceCluster, nameServer)
+	stopSourceClusterTopicWriteCommand := strings.Join(stopSourceClusterTopicWrite, " ")
+	reqLogger.Info("stopSourceClusterTopicWriteCommand: " + stopSourceClusterTopicWriteCommand)
+	cmd = exec.Command("sh", stopSourceClusterTopicWriteCommand)
+	output, err = cmd.Output()
+	if err != nil {
+		reqLogger.Error(err, "Failed to stop Topic " + topic + " write in SourceCluster " + sourceCluster)
+		return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+	}
+	// TODO: verify output of stopSourceClusterTopicWriteCommand
+	reqLogger.Info("Successfully stop Topic " + topic + " write in SourceCluster " + sourceCluster + " with output: " + string(output))
+
+	// step4: check source cluster unconsumed message
+	for {
+		checkConsumeProgress := buildCheckConsumeProgressCommand(consumerGroup, nameServer)
+		checkConsumeProgressCommand := strings.Join(checkConsumeProgress, " ")
+		reqLogger.Info("checkConsumeProgressCommand: " + checkConsumeProgressCommand)
+		cmd = exec.Command("sh", checkConsumeProgressCommand)
+		output, err = cmd.Output()
+		if err != nil {
+			reqLogger.Error(err, "Failed to check consumerGroup " + consumerGroup)
+			return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+		}
+		var finished bool = isConsumeFinished(string(output))
+		reqLogger.Info(" output: " + string(output))
+		if finished {
+			break
+		}
+		time.Sleep(time.Duration(5) * time.Second)
+	}
+
+	// step5: delete topic in source cluster
+	deleteSourceClusterTopic := buildDeleteSourceClusterTopicCommand(topic, sourceCluster, nameServer)
+	deleteSourceClusterTopicCommand := strings.Join(deleteSourceClusterTopic, " ")
+	reqLogger.Info("deleteSourceClusterTopicCommand: " + deleteSourceClusterTopicCommand)
+	cmd = exec.Command("sh", deleteSourceClusterTopicCommand)
+	output, err = cmd.Output()
+	if err != nil {
+		reqLogger.Error(err, "Failed to delete Topic " + topic + " in SourceCluster " + sourceCluster)
+		return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+	}
+	// TODO: verify output
+	reqLogger.Info("Successfully delete Topic " + topic + " in SourceCluster " + sourceCluster + " with output: " + string(output))
+
+	// step6: delete consumer group in source cluster
+	deleteConsumerGroup := buildDeleteConsumeGroupCommand(consumerGroup, sourceCluster, nameServer)
+	deleteConsumerGroupCommand := strings.Join(deleteConsumerGroup, " ")
+	reqLogger.Info("deleteConsumerGroupCommand: " + deleteConsumerGroupCommand)
+	cmd = exec.Command("sh", deleteConsumerGroupCommand)
+	output, err = cmd.Output()
+	if err != nil {
+		reqLogger.Error(err, "Failed to delete consumer group " + consumerGroup + " in SourceCluster " + sourceCluster)
+		return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+	}
+	// TODO: verify output
+	reqLogger.Info("Successfully delete consumer group " + consumerGroup + " in SourceCluster " + sourceCluster + " with output: " + string(output))
+
+	// step7: create retry topic
+	createRetryTopic := buildAddRetryTopicToClusterCommand(consumerGroup, targetCluster, nameServer)
+	createRetryTopicCommand := strings.Join(createRetryTopic, " ")
+	reqLogger.Info("createRetryTopicCommand: " + createRetryTopicCommand)
+	cmd = exec.Command("sh", createRetryTopicCommand)
+	output, err = cmd.Output()
+	if err != nil {
+		reqLogger.Error(err, "Failed to create retry topic of consumer group " + consumerGroup + " in TargetCluster " + targetCluster)
+		return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+	}
+	// TODO: verify output
+	reqLogger.Info("Successfully create retry topic of consumer group " + consumerGroup + " in TargetCluster " + targetCluster + " with output: " + string(output))
+
+	reqLogger.Info("Topic " + topic + " has been transferred from " + sourceCluster + " to " + targetCluster)
+	return reconcile.Result{}, nil
+}
+
+func buildAddRetryTopicToClusterCommand(consumerGroup string, cluster string, nameServer string) []string {
+	cmdOpts := []string{
+		cons.AdminToolDir,
+		"updatetopic",
+		"-t",
+		"%RETRY%" + consumerGroup,
+		"-c",
+		cluster,
+		"-r",
+		"1",
+		"-w",
+		"1",
+		"-p",
+		"6",
+		"-n",
+		nameServer,
+	}
+	return cmdOpts
+}
+
+func isConsumeFinished(output string) bool {
+	return true
+}
+
+func buildDeleteConsumeGroupCommand(consumerGroup string, cluster string, nameServer string) []string {
+	cmdOpts := []string{
+		cons.AdminToolDir,
+		"deleteSubGroup",
+		"-g",
+		consumerGroup,
+		"-c",
+		cluster,
+		"-n",
+		nameServer,
+	}
+	return cmdOpts
+}
+
+func buildDeleteSourceClusterTopicCommand(topic string, sourceCluster string, nameServer string) []string {
+	cmdOpts := []string{
+		cons.AdminToolDir,
+		"deletetopic",
+		"-t",
+		topic,
+		"-c",
+		sourceCluster,
+		"-n",
+		nameServer,
+	}
+	return cmdOpts
+}
+
+func isAddConsumerGroupToTargetClusterOutputValid(s string)  {
+
+}
+
+func buildCheckConsumeProgressCommand(consumerGroup string, nameServer string) []string {
+	cmdOpts := []string{
+		cons.AdminToolDir,
+		"consumerprogress",
+		"-g",
+		consumerGroup,
+		"-n",
+		nameServer,
+	}
+	return cmdOpts
+}
+
+func buildStopClusterTopicWriteCommand(topic string, cluster string, nameServer string) []string {
+	cmdOpts := []string{
+		cons.AdminToolDir,
+		"updatetopic",
+		"-t",
+		topic,
+		"-c",
+		cluster,
+		"-p",
+		"4",
+		"-w",
+		"0",
+		"-n",
+		nameServer,
+	}
+	return cmdOpts
+}
+
+func buildAddConsumerGroupToClusterCommand(consumerGroup string, cluster string, nameServer string) []string {
+	cmdOpts := []string{
+		cons.AdminToolDir,
+		"updatesubgroup",
+		"-g",
+		consumerGroup,
+		"-c",
+		cluster,
+		"-m",
+		"true",
+		"-d",
+		"true",
+		"-n",
+		nameServer,
+	}
+	return cmdOpts
+}
+
+func buildAddTopicToClusterCommand(topic string, cluster string, nameServer string) []string {
+	cmdOpts := []string{
+		cons.AdminToolDir,
+		"updatetopic",
+		"-t",
+		topic,
+		"-c",
+		cluster,
+		"-n",
+		nameServer,
+	}
+	return cmdOpts
+}