feat(topic transfer): add roll-back operations and verify operations for topic transfer atomicity
diff --git a/example/rocketmq_v1alpha1_topictransfer_cr.yaml b/example/rocketmq_v1alpha1_topictransfer_cr.yaml
index 4d05b34..8597da0 100644
--- a/example/rocketmq_v1alpha1_topictransfer_cr.yaml
+++ b/example/rocketmq_v1alpha1_topictransfer_cr.yaml
@@ -23,6 +23,6 @@
   # consumerGroup defines the consumer group
   consumerGroup: please_rename_unique_group_name_4
   # sourceCluster define the source cluster
-  sourceCluster: broker
+  sourceCluster: broker-0
   # targetCluster defines the target cluster
-  targetCluster: targetbroker
+  targetCluster: broker-1
diff --git a/install-operator.sh b/install-operator.sh
index 2fc7b00..15ef3b1 100755
--- a/install-operator.sh
+++ b/install-operator.sh
@@ -17,6 +17,7 @@
 
 kubectl create -f deploy/crds/rocketmq_v1alpha1_broker_crd.yaml
 kubectl create -f deploy/crds/rocketmq_v1alpha1_nameservice_crd.yaml
+kubectl create -f deploy/crds/rocketmq_v1alpha1_topictransfer_cr.yaml
 kubectl create -f deploy/service_account.yaml
 kubectl create -f deploy/role.yaml
 kubectl create -f deploy/role_binding.yaml
diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go
index ee349ed..1f64d54 100644
--- a/pkg/constants/constants.go
+++ b/pkg/constants/constants.go
@@ -19,6 +19,7 @@
 
 const BrokerClusterPrefix = "broker-cluster-"
 const BrokerContainerName = "broker"
+const BasicCommand = "sh"
 const AdminToolDir = "/home/rocketmq/rocketmq-4.5.0/bin/mqadmin"
 const StoreConfigDir = "/home/rocketmq/store/config"
 const TopicJsonDir = "/home/rocketmq/store/config/topics.json"
@@ -42,12 +43,19 @@
 const BrokerMainContainerPortName = "main"
 const BrokerHighAvailabilityContainerPort  = 10912
 const BrokerHighAvailabilityContainerPortName = "ha"
-
+// storage modes
 const StorageModeNFS  = "NFS"
 const StorageModeEmptyDir  = "EmptyDir"
 const StorageModeHostPath  = "HostPath"
-
+// threshold values
 const RestartBrokerPodIntervalInSecond  = 30
 const MinMetadataJsonFileSize  = 5
 const MinIpListLength  = 8
-
+const CheckConsumeFinishIntervalInSecond = 5
+const RequeueIntervalInSecond = 6
+// fields
+const TopicIndex = 0
+const BrokerNameIndex = 1
+const DiffIndex = 6
+const TopicListTopic = 1
+const TopicListConsumerGroup = 2
diff --git a/pkg/controller/broker/broker_controller.go b/pkg/controller/broker/broker_controller.go
index 75ee44e..8d54e73 100644
--- a/pkg/controller/broker/broker_controller.go
+++ b/pkg/controller/broker/broker_controller.go
@@ -308,7 +308,7 @@
 	//	}
 	//}
 
-	return reconcile.Result{true, time.Duration(3) * time.Second}, nil
+	return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil
 }
 
 func getCopyMetadataJsonCommand(dir string, sourcePodName string, namespace string, k8s *tool.K8sClient) string {
@@ -414,7 +414,7 @@
 							Value: strconv.Itoa(replicaIndex),
 						}, {
 							Name:  cons.EnvBrokerClusterName,
-							Value: broker.Name,
+							Value: broker.Name + "-" + strconv.Itoa(brokerGroupIndex),
 						}, {
 							Name:  cons.EnvBrokerName,
 							Value: broker.Name + "-" + strconv.Itoa(brokerGroupIndex),
diff --git a/pkg/controller/nameservice/nameservice_controller.go b/pkg/controller/nameservice/nameservice_controller.go
index 8385350..b06083d 100644
--- a/pkg/controller/nameservice/nameservice_controller.go
+++ b/pkg/controller/nameservice/nameservice_controller.go
@@ -232,7 +232,7 @@
 	}
 
 	if requeue {
-		return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3)*time.Second}, nil
+		return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond)*time.Second}, nil
 	} else {
 		return reconcile.Result{}, nil
 	}
diff --git a/pkg/controller/topictransfer/topictransfer_controller.go b/pkg/controller/topictransfer/topictransfer_controller.go
index 33c4ea6..abc0f6a 100644
--- a/pkg/controller/topictransfer/topictransfer_controller.go
+++ b/pkg/controller/topictransfer/topictransfer_controller.go
@@ -20,6 +20,7 @@
 import (
 	"context"
 	"os/exec"
+	"strconv"
 	"strings"
 	"time"
 
@@ -39,6 +40,8 @@
 )
 
 var log = logf.Log.WithName("controller_topictransfer")
+var undo = false
+var status = 0
 
 /**
 * USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller
@@ -119,119 +122,285 @@
 		return reconcile.Result{}, err
 	}
 
-	// TODO: if ConsumerGroup could be decided by listing the topics
 
-	// TODO: what if current name server is compromised
-	// step1: add consumer group to target cluster
-	nameServer := strings.Split(share.NameServersStr, ";")[0]
-	consumerGroup := topicTransfer.Spec.ConsumerGroup
+	topic := topicTransfer.Spec.Topic
 	targetCluster := topicTransfer.Spec.TargetCluster
 	sourceCluster := topicTransfer.Spec.SourceCluster
-	topic := topicTransfer.Spec.Topic
 
-	addConsumerGroupToTargetCluster := buildAddConsumerGroupToClusterCommand(consumerGroup, targetCluster, nameServer)
-	addConsumerGroupToTargetClusterCommand := strings.Join(addConsumerGroupToTargetCluster, " ")
-	reqLogger.Info("AddConsumerGroupToTargetClusterCommand: " + addConsumerGroupToTargetClusterCommand)
-	cmd := exec.Command("sh", addConsumerGroupToTargetClusterCommand)
-	output, err := cmd.Output()
-	if err != nil {
-		reqLogger.Error(err, "Failed to add ConsumerGroup " + consumerGroup + " to TargetCluster " + targetCluster)
-		return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+	nameServer := strings.Split(share.NameServersStr, ";")[0]
+	if len(nameServer) < cons.MinIpListLength {
+		reqLogger.Info("There is no available name server now thus the topic transfer process is terminated.")
+		// terminate the transfer process
+		return reconcile.Result{}, nil
 	}
-	// isAddConsumerGroupToTargetClusterOutputValid(string(output))
-	reqLogger.Info("Successfully add ConsumerGroup " + consumerGroup + " to TargetCluster " + targetCluster + " with output: " + string(output))
 
-	// step2: add consumer group to target cluster
-	addTopicToTargetCluster := buildAddTopicToClusterCommand(topic, targetCluster, nameServer)
-	addTopicToTargetClusterCommand := strings.Join(addTopicToTargetCluster, " ")
-	reqLogger.Info("addTopicToTargetClusterCommand: " + addTopicToTargetClusterCommand)
-	cmd = exec.Command("sh", addTopicToTargetClusterCommand)
-	output, err = cmd.Output()
-	if err != nil {
-		reqLogger.Error(err, "Failed to add Topic " + topic + " to TargetCluster " + targetCluster)
-		return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+	// ConsumerGroup could be decided by listing the topics
+	consumerGroups := getConsumerGroupByTopic(topic, nameServer)
+	if consumerGroups == nil {
+		reqLogger.Info("There is no consumer group of topic " + topic)
 	}
-	// TODO: verify output of addTopicToTargetClusterCommand
-	reqLogger.Info("Successfully add Topic " + topic + " to TargetCluster " + targetCluster + " with output: " + string(output))
 
-	// step3: stop write in source cluster topic
-	stopSourceClusterTopicWrite := buildStopClusterTopicWriteCommand(topic, sourceCluster, nameServer)
-	stopSourceClusterTopicWriteCommand := strings.Join(stopSourceClusterTopicWrite, " ")
-	reqLogger.Info("stopSourceClusterTopicWriteCommand: " + stopSourceClusterTopicWriteCommand)
-	cmd = exec.Command("sh", stopSourceClusterTopicWriteCommand)
-	output, err = cmd.Output()
-	if err != nil {
-		reqLogger.Error(err, "Failed to stop Topic " + topic + " write in SourceCluster " + sourceCluster)
-		return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
-	}
-	// TODO: verify output of stopSourceClusterTopicWriteCommand
-	reqLogger.Info("Successfully stop Topic " + topic + " write in SourceCluster " + sourceCluster + " with output: " + string(output))
+	if undo {
+		// undo the operations for atomicity
+		reqLogger.Info("Transfer topic " + topic + "  from " + sourceCluster + " to " + targetCluster + " failed, rolling back...")
+		switch status {
+		case 7:
+			fallthrough
+		case 6:
+			undoDeleteConsumeGroup(consumerGroups, sourceCluster, nameServer)
+			fallthrough
+		case 5:
+			undoDeleteTopic(topic, sourceCluster, nameServer)
+			fallthrough
+		case 4:
+			fallthrough
+		case 3:
+			undoStopWrite(topic, sourceCluster, nameServer)
+		default:
+			// for user data safety, no special operations for other status
+		}
+	} else {
+		// step1: add all consumer groups to target cluster
+		status = 1
+		for i, consumerGroup := range consumerGroups {
+			log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
+			addConsumerGroupToTargetClusterCommand := buildAddConsumerGroupToClusterCommand(consumerGroup, targetCluster, nameServer)
+			reqLogger.Info("AddConsumerGroupToTargetClusterCommand: " + addConsumerGroupToTargetClusterCommand)
+			cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addConsumerGroupToTargetClusterCommand)
+			output, err := cmd.Output()
+			// validate command output
+			if err != nil || !isUpdateConsumerGroupSuccess(string(output)) {
+				reqLogger.Error(err, "Failed to add ConsumerGroup " + consumerGroup + " to TargetCluster " + targetCluster + " with output: " + string(output))
+				// terminate the transfer process
+				undo = true
+				return reconcile.Result{Requeue: true}, err
+			}
+			reqLogger.Info("Successfully add ConsumerGroup " + consumerGroup + " to TargetCluster " + targetCluster + " with output: " + string(output))
+		}
 
-	// step4: check source cluster unconsumed message
-	for {
-		checkConsumeProgress := buildCheckConsumeProgressCommand(consumerGroup, nameServer)
-		checkConsumeProgressCommand := strings.Join(checkConsumeProgress, " ")
-		reqLogger.Info("checkConsumeProgressCommand: " + checkConsumeProgressCommand)
-		cmd = exec.Command("sh", checkConsumeProgressCommand)
+		// step2: add consumer group to target cluster
+		status = 2
+		addTopicToTargetClusterCommand := buildAddTopicToClusterCommand(topic, targetCluster, nameServer)
+		reqLogger.Info("addTopicToTargetClusterCommand: " + addTopicToTargetClusterCommand)
+		cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addTopicToTargetClusterCommand)
+		output, err := cmd.Output()
+		// validate command output
+		if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
+			reqLogger.Error(err, "Failed to add Topic " + topic + " to TargetCluster " + targetCluster + " with output: " + string(output))
+			// terminate the transfer process
+			undo = true
+			return reconcile.Result{Requeue: true}, err
+		}
+		reqLogger.Info("Successfully add Topic " + topic + " to TargetCluster " + targetCluster + " with output: " + string(output))
+
+		// step3: stop write in source cluster topic
+		status = 3
+		stopSourceClusterTopicWriteCommand := buildStopClusterTopicWriteCommand(topic, sourceCluster, nameServer)
+		reqLogger.Info("stopSourceClusterTopicWriteCommand: " + stopSourceClusterTopicWriteCommand)
+		cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, stopSourceClusterTopicWriteCommand)
 		output, err = cmd.Output()
-		if err != nil {
-			reqLogger.Error(err, "Failed to check consumerGroup " + consumerGroup)
-			return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+		// validate command output
+		if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
+			reqLogger.Error(err, "Failed to stop Topic " + topic + " write in SourceCluster " + sourceCluster + " with output: " + string(output))
+			// terminate the transfer process
+			undo = true
+			return reconcile.Result{Requeue: true}, err
 		}
-		var finished bool = isConsumeFinished(string(output))
-		reqLogger.Info(" output: " + string(output))
-		if finished {
-			break
+		reqLogger.Info("Successfully stop Topic " + topic + " write in SourceCluster " + sourceCluster + " with output: " + string(output))
+
+		// step4: check source cluster unconsumed message
+		status = 4
+		for i, consumerGroup := range consumerGroups {
+			log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
+			for {
+				checkConsumeProgressCommand := buildCheckConsumeProgressCommand(consumerGroup, nameServer)
+				reqLogger.Info("checkConsumeProgressCommand: " + checkConsumeProgressCommand)
+				cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, checkConsumeProgressCommand)
+				output, err = cmd.Output()
+				if err != nil || !isCheckConsumeProcessCommandSuccess(string(output)) {
+					reqLogger.Error(err, "Failed to check consumerGroup "+consumerGroup+" with output: "+string(output))
+					// terminate the transfer process
+					undo = true
+					return reconcile.Result{Requeue: true}, err
+				}
+				reqLogger.Info(" output: " + string(output))
+				if isConsumeFinished(string(output), topic, sourceCluster) {
+					reqLogger.Info("Message consumption of " + consumerGroup + " in source cluster " + sourceCluster + " finished!")
+					break
+				}
+				reqLogger.Info("Wait a moment for message consumption of " + consumerGroup + " in source cluster " + sourceCluster + " finish ...")
+				time.Sleep(time.Duration(cons.CheckConsumeFinishIntervalInSecond) * time.Second)
+			}
 		}
-		time.Sleep(time.Duration(5) * time.Second)
+
+		// step5: delete topic in source cluster
+		status = 5
+		deleteSourceClusterTopicCommand := buildDeleteSourceClusterTopicCommand(topic, sourceCluster, nameServer)
+		reqLogger.Info("deleteSourceClusterTopicCommand: " + deleteSourceClusterTopicCommand)
+		cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, deleteSourceClusterTopicCommand)
+		output, err = cmd.Output()
+		if err != nil || !isDeleteTopicCommandSuccess(string(output)) {
+			reqLogger.Error(err, "Failed to delete Topic " + topic + " in SourceCluster " + sourceCluster + " with output: " + string(output))
+			// terminate the transfer process
+			undo = true
+			return reconcile.Result{Requeue: true}, err
+		}
+		reqLogger.Info("Successfully delete Topic " + topic + " in SourceCluster " + sourceCluster + " with output: " + string(output))
+
+		// step6: delete consumer group in source cluster
+		status = 6
+		for i, consumerGroup := range consumerGroups {
+			log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
+			deleteConsumerGroupCommand := buildDeleteConsumeGroupCommand(consumerGroup, sourceCluster, nameServer)
+			reqLogger.Info("deleteConsumerGroupCommand: " + deleteConsumerGroupCommand)
+			cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, deleteConsumerGroupCommand)
+			output, err = cmd.Output()
+			if err != nil || !isDeleteConsumerGroupSuccess(string(output)) {
+				reqLogger.Error(err, "Failed to delete consumer group "+consumerGroup+" in SourceCluster "+sourceCluster+" with output: "+string(output))
+				// terminate the transfer process
+				undo = true
+				return reconcile.Result{Requeue: true}, err
+			}
+			reqLogger.Info("Successfully delete consumer group " + consumerGroup + " in SourceCluster " + sourceCluster + " with output: " + string(output))
+		}
+
+		// step7: create retry topic
+		status = 7
+		for i, consumerGroup := range consumerGroups {
+			log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
+			createRetryTopicCommand := buildAddRetryTopicToClusterCommand(consumerGroup, targetCluster, nameServer)
+			reqLogger.Info("createRetryTopicCommand: " + createRetryTopicCommand)
+			cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, createRetryTopicCommand)
+			output, err = cmd.Output()
+			if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
+				reqLogger.Error(err, "Failed to create retry topic of consumer group " + consumerGroup + " in TargetCluster " + targetCluster + " with output: " + string(output))
+				// terminate the transfer process
+				undo = true
+				return reconcile.Result{Requeue: true}, err
+			}
+			reqLogger.Info("Successfully create retry topic of consumer group " + consumerGroup + " in TargetCluster " + targetCluster + " with output: " + string(output))
+		}
+
+		reqLogger.Info("Topic " + topic + " has been successfully transferred from " + sourceCluster + " to " + targetCluster)
 	}
 
-	// step5: delete topic in source cluster
-	deleteSourceClusterTopic := buildDeleteSourceClusterTopicCommand(topic, sourceCluster, nameServer)
-	deleteSourceClusterTopicCommand := strings.Join(deleteSourceClusterTopic, " ")
-	reqLogger.Info("deleteSourceClusterTopicCommand: " + deleteSourceClusterTopicCommand)
-	cmd = exec.Command("sh", deleteSourceClusterTopicCommand)
-	output, err = cmd.Output()
-	if err != nil {
-		reqLogger.Error(err, "Failed to delete Topic " + topic + " in SourceCluster " + sourceCluster)
-		return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
-	}
-	// TODO: verify output
-	reqLogger.Info("Successfully delete Topic " + topic + " in SourceCluster " + sourceCluster + " with output: " + string(output))
-
-	// step6: delete consumer group in source cluster
-	deleteConsumerGroup := buildDeleteConsumeGroupCommand(consumerGroup, sourceCluster, nameServer)
-	deleteConsumerGroupCommand := strings.Join(deleteConsumerGroup, " ")
-	reqLogger.Info("deleteConsumerGroupCommand: " + deleteConsumerGroupCommand)
-	cmd = exec.Command("sh", deleteConsumerGroupCommand)
-	output, err = cmd.Output()
-	if err != nil {
-		reqLogger.Error(err, "Failed to delete consumer group " + consumerGroup + " in SourceCluster " + sourceCluster)
-		return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
-	}
-	// TODO: verify output
-	reqLogger.Info("Successfully delete consumer group " + consumerGroup + " in SourceCluster " + sourceCluster + " with output: " + string(output))
-
-	// step7: create retry topic
-	createRetryTopic := buildAddRetryTopicToClusterCommand(consumerGroup, targetCluster, nameServer)
-	createRetryTopicCommand := strings.Join(createRetryTopic, " ")
-	reqLogger.Info("createRetryTopicCommand: " + createRetryTopicCommand)
-	cmd = exec.Command("sh", createRetryTopicCommand)
-	output, err = cmd.Output()
-	if err != nil {
-		reqLogger.Error(err, "Failed to create retry topic of consumer group " + consumerGroup + " in TargetCluster " + targetCluster)
-		return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
-	}
-	// TODO: verify output
-	reqLogger.Info("Successfully create retry topic of consumer group " + consumerGroup + " in TargetCluster " + targetCluster + " with output: " + string(output))
-
-	reqLogger.Info("Topic " + topic + " has been transferred from " + sourceCluster + " to " + targetCluster)
 	return reconcile.Result{}, nil
 }
 
-func buildAddRetryTopicToClusterCommand(consumerGroup string, cluster string, nameServer string) []string {
+func undoStopWrite(topic string, cluster string, nameServer string) {
+	addTopicToClusterCommand := buildUndoStopWriteCommand(topic, cluster, nameServer)
+	log.Info("undoStopWrite: " + addTopicToClusterCommand)
+	cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addTopicToClusterCommand)
+	output, err := cmd.Output()
+	if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
+		log.Error(err, "Failed to undo stop write topic with output: " + string(output))
+	}
+	log.Info("Successfully undo stop write topic with output: " + string(output))
+}
+
+func undoDeleteTopic(topic string, cluster string, nameServer string) {
+	addTopicToClusterCommand := buildAddTopicToClusterCommand(topic, cluster, nameServer)
+	log.Info("undoDeleteTopic: " + addTopicToClusterCommand)
+	cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addTopicToClusterCommand)
+	output, err := cmd.Output()
+	if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
+		log.Error(err, "Failed to undo delete topic with output: " + string(output))
+	}
+	log.Info("Successfully undo delete topic with output: " + string(output))
+}
+
+func undoDeleteConsumeGroup(consumerGroups []string, cluster string, nameServer string) {
+	for _, consumerGroup := range consumerGroups {
+		addConsumerGroupToTargetClusterCommand := buildAddConsumerGroupToClusterCommand(consumerGroup, cluster, nameServer)
+		log.Info("undoDeleteConsumeGroup: " + addConsumerGroupToTargetClusterCommand)
+		cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addConsumerGroupToTargetClusterCommand)
+		output, err := cmd.Output()
+		if err != nil || !isUpdateConsumerGroupSuccess(string(output)) {
+			log.Error(err, "Failed to undo delete consume group with output: " + string(output))
+		}
+		log.Info("Successfully undo delete consume group with output: " + string(output))
+	}
+}
+
+func getConsumerGroupByTopic(topic string, nameServer string) []string {
+	var consumerGroups []string
+	topicListCmd := buildTopicListCommand(nameServer)
+	cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, topicListCmd)
+	output, err := cmd.Output()
+	if err != nil || !isTopicListSuccess(string(output)) {
+		log.Error(err, "Failed to list topic with output: " + string(output))
+		return nil
+	}
+	log.Info("topicListCmd output: " + string(output))
+
+	lines := strings.Split(string(output), "\n")
+	for i := 1; i < len(lines); i++ {
+		fields := strings.Fields(strings.TrimSpace(lines[i]))
+		if len(fields) >= cons.TopicListConsumerGroup {
+			if fields[cons.TopicListTopic] == topic {
+				consumerGroups = append(consumerGroups, fields[cons.TopicListConsumerGroup])
+			}
+		}
+	}
+	return consumerGroups
+}
+
+func isTopicListSuccess(s string) bool {
+	return strings.Contains(s, "#Topic") && strings.Contains(s, "#Consumer Group")
+}
+
+func isCheckConsumeProcessCommandSuccess(s string) bool {
+	return strings.Contains(s, "#Topic")
+}
+
+func isDeleteTopicCommandSuccess(s string) bool {
+	return strings.Contains(s, "delete topic") && strings.Contains(s, "success")
+}
+
+func isUpdateTopicCommandSuccess(s string) bool {
+	return strings.Contains(s, "create topic") && strings.Contains(s, "success")
+}
+
+func isDeleteConsumerGroupSuccess(s string) bool {
+	return strings.Contains(s, "delete subscription group") && strings.Contains(s, "success")
+}
+
+func isUpdateConsumerGroupSuccess(s string) bool {
+	// return strings.Contains(s, "create subscription group") && strings.Contains(s, "success")
+	return strings.Contains(s, "groupName")
+}
+
+func buildUndoStopWriteCommand(topic string, cluster string, nameServer string) string {
 	cmdOpts := []string{
-		cons.AdminToolDir,
+		"updatetopic",
+		"-t",
+		topic,
+		"-c",
+		cluster,
+		"-r",
+		"8",
+		"-w",
+		"8",
+		"-p",
+		"6",
+		"-n",
+		nameServer,
+	}
+	return strings.Join(cmdOpts, " ")
+}
+
+func buildTopicListCommand(nameServer string) string {
+	cmdOpts := []string{
+		"topiclist",
+		"-c",
+		"-n",
+		nameServer,
+	}
+	return strings.Join(cmdOpts, " ")
+}
+
+func buildAddRetryTopicToClusterCommand(consumerGroup string, cluster string, nameServer string) string {
+	cmdOpts := []string{
 		"updatetopic",
 		"-t",
 		"%RETRY%" + consumerGroup,
@@ -246,16 +415,38 @@
 		"-n",
 		nameServer,
 	}
-	return cmdOpts
+	return strings.Join(cmdOpts, " ")
 }
 
-func isConsumeFinished(output string) bool {
+func getClusterBrokerNames(cluster string) []string {
+	// TODO: consider more scenarios
+	return []string {cluster}
+}
+
+func isConsumeFinished(output string, topic string, cluster string) bool {
+	lines := strings.Split(output, "\n")
+	brokers := getClusterBrokerNames(cluster)
+	for i := 1; i < len(lines); i++ {
+		fields := strings.Fields(strings.TrimSpace(lines[i]))
+		if len(fields) > cons.DiffIndex {
+			for _, broker := range brokers {
+				log.Info("broker = " + broker)
+				log.Info("fields[cons.TopicIndex] = " + fields[cons.TopicIndex] + " , in line " + strconv.Itoa(i))
+				log.Info("fields[cons.BrokerNameIndex] = " + fields[cons.BrokerNameIndex] + " , in line " + strconv.Itoa(i))
+				log.Info("fields[cons.DiffIndex] = " + fields[cons.DiffIndex] + " , in line " + strconv.Itoa(i))
+				if fields[cons.TopicIndex] == topic && fields[cons.BrokerNameIndex] == broker {
+					if fields[cons.DiffIndex] != "0" {
+						return false
+					}
+				}
+			}
+		}
+	}
 	return true
 }
 
-func buildDeleteConsumeGroupCommand(consumerGroup string, cluster string, nameServer string) []string {
+func buildDeleteConsumeGroupCommand(consumerGroup string, cluster string, nameServer string) string {
 	cmdOpts := []string{
-		cons.AdminToolDir,
 		"deleteSubGroup",
 		"-g",
 		consumerGroup,
@@ -264,12 +455,11 @@
 		"-n",
 		nameServer,
 	}
-	return cmdOpts
+	return strings.Join(cmdOpts, " ")
 }
 
-func buildDeleteSourceClusterTopicCommand(topic string, sourceCluster string, nameServer string) []string {
+func buildDeleteSourceClusterTopicCommand(topic string, sourceCluster string, nameServer string) string {
 	cmdOpts := []string{
-		cons.AdminToolDir,
 		"deletetopic",
 		"-t",
 		topic,
@@ -278,28 +468,22 @@
 		"-n",
 		nameServer,
 	}
-	return cmdOpts
+	return strings.Join(cmdOpts, " ")
 }
 
-func isAddConsumerGroupToTargetClusterOutputValid(s string)  {
-
-}
-
-func buildCheckConsumeProgressCommand(consumerGroup string, nameServer string) []string {
+func buildCheckConsumeProgressCommand(consumerGroup string, nameServer string) string {
 	cmdOpts := []string{
-		cons.AdminToolDir,
 		"consumerprogress",
 		"-g",
 		consumerGroup,
 		"-n",
 		nameServer,
 	}
-	return cmdOpts
+	return strings.Join(cmdOpts, " ")
 }
 
-func buildStopClusterTopicWriteCommand(topic string, cluster string, nameServer string) []string {
+func buildStopClusterTopicWriteCommand(topic string, cluster string, nameServer string) string {
 	cmdOpts := []string{
-		cons.AdminToolDir,
 		"updatetopic",
 		"-t",
 		topic,
@@ -312,12 +496,11 @@
 		"-n",
 		nameServer,
 	}
-	return cmdOpts
+	return strings.Join(cmdOpts, " ")
 }
 
-func buildAddConsumerGroupToClusterCommand(consumerGroup string, cluster string, nameServer string) []string {
+func buildAddConsumerGroupToClusterCommand(consumerGroup string, cluster string, nameServer string) string {
 	cmdOpts := []string{
-		cons.AdminToolDir,
 		"updatesubgroup",
 		"-g",
 		consumerGroup,
@@ -330,12 +513,11 @@
 		"-n",
 		nameServer,
 	}
-	return cmdOpts
+	return strings.Join(cmdOpts, " ")
 }
 
-func buildAddTopicToClusterCommand(topic string, cluster string, nameServer string) []string {
+func buildAddTopicToClusterCommand(topic string, cluster string, nameServer string) string {
 	cmdOpts := []string{
-		cons.AdminToolDir,
 		"updatetopic",
 		"-t",
 		topic,
@@ -344,5 +526,5 @@
 		"-n",
 		nameServer,
 	}
-	return cmdOpts
+	return strings.Join(cmdOpts, " ")
 }
diff --git a/purge-operator.sh b/purge-operator.sh
index 81c75d0..f49bb32 100755
--- a/purge-operator.sh
+++ b/purge-operator.sh
@@ -25,4 +25,4 @@
 kubectl delete -f deploy/service_account.yaml
 kubectl delete -f deploy/crds/rocketmq_v1alpha1_broker_crd.yaml
 kubectl delete -f deploy/crds/rocketmq_v1alpha1_nameservice_crd.yaml
-
+kubectl delete -f deploy/crds/rocketmq_v1alpha1_topictransfer_crd.yaml