feat(topic transfer): add roll-back operations and verify operations for topic transfer atomicity
diff --git a/example/rocketmq_v1alpha1_topictransfer_cr.yaml b/example/rocketmq_v1alpha1_topictransfer_cr.yaml
index 4d05b34..8597da0 100644
--- a/example/rocketmq_v1alpha1_topictransfer_cr.yaml
+++ b/example/rocketmq_v1alpha1_topictransfer_cr.yaml
@@ -23,6 +23,6 @@
# consumerGroup defines the consumer group
consumerGroup: please_rename_unique_group_name_4
# sourceCluster define the source cluster
- sourceCluster: broker
+ sourceCluster: broker-0
# targetCluster defines the target cluster
- targetCluster: targetbroker
+ targetCluster: broker-1
diff --git a/install-operator.sh b/install-operator.sh
index 2fc7b00..15ef3b1 100755
--- a/install-operator.sh
+++ b/install-operator.sh
@@ -17,6 +17,7 @@
kubectl create -f deploy/crds/rocketmq_v1alpha1_broker_crd.yaml
kubectl create -f deploy/crds/rocketmq_v1alpha1_nameservice_crd.yaml
+kubectl create -f deploy/crds/rocketmq_v1alpha1_topictransfer_cr.yaml
kubectl create -f deploy/service_account.yaml
kubectl create -f deploy/role.yaml
kubectl create -f deploy/role_binding.yaml
diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go
index ee349ed..1f64d54 100644
--- a/pkg/constants/constants.go
+++ b/pkg/constants/constants.go
@@ -19,6 +19,7 @@
const BrokerClusterPrefix = "broker-cluster-"
const BrokerContainerName = "broker"
+const BasicCommand = "sh"
const AdminToolDir = "/home/rocketmq/rocketmq-4.5.0/bin/mqadmin"
const StoreConfigDir = "/home/rocketmq/store/config"
const TopicJsonDir = "/home/rocketmq/store/config/topics.json"
@@ -42,12 +43,19 @@
const BrokerMainContainerPortName = "main"
const BrokerHighAvailabilityContainerPort = 10912
const BrokerHighAvailabilityContainerPortName = "ha"
-
+// storage modes
const StorageModeNFS = "NFS"
const StorageModeEmptyDir = "EmptyDir"
const StorageModeHostPath = "HostPath"
-
+// threshold values
const RestartBrokerPodIntervalInSecond = 30
const MinMetadataJsonFileSize = 5
const MinIpListLength = 8
-
+const CheckConsumeFinishIntervalInSecond = 5
+const RequeueIntervalInSecond = 6
+// fields
+const TopicIndex = 0
+const BrokerNameIndex = 1
+const DiffIndex = 6
+const TopicListTopic = 1
+const TopicListConsumerGroup = 2
diff --git a/pkg/controller/broker/broker_controller.go b/pkg/controller/broker/broker_controller.go
index 75ee44e..8d54e73 100644
--- a/pkg/controller/broker/broker_controller.go
+++ b/pkg/controller/broker/broker_controller.go
@@ -308,7 +308,7 @@
// }
//}
- return reconcile.Result{true, time.Duration(3) * time.Second}, nil
+ return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil
}
func getCopyMetadataJsonCommand(dir string, sourcePodName string, namespace string, k8s *tool.K8sClient) string {
@@ -414,7 +414,7 @@
Value: strconv.Itoa(replicaIndex),
}, {
Name: cons.EnvBrokerClusterName,
- Value: broker.Name,
+ Value: broker.Name + "-" + strconv.Itoa(brokerGroupIndex),
}, {
Name: cons.EnvBrokerName,
Value: broker.Name + "-" + strconv.Itoa(brokerGroupIndex),
diff --git a/pkg/controller/nameservice/nameservice_controller.go b/pkg/controller/nameservice/nameservice_controller.go
index 8385350..b06083d 100644
--- a/pkg/controller/nameservice/nameservice_controller.go
+++ b/pkg/controller/nameservice/nameservice_controller.go
@@ -232,7 +232,7 @@
}
if requeue {
- return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3)*time.Second}, nil
+ return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond)*time.Second}, nil
} else {
return reconcile.Result{}, nil
}
diff --git a/pkg/controller/topictransfer/topictransfer_controller.go b/pkg/controller/topictransfer/topictransfer_controller.go
index 33c4ea6..abc0f6a 100644
--- a/pkg/controller/topictransfer/topictransfer_controller.go
+++ b/pkg/controller/topictransfer/topictransfer_controller.go
@@ -20,6 +20,7 @@
import (
"context"
"os/exec"
+ "strconv"
"strings"
"time"
@@ -39,6 +40,8 @@
)
var log = logf.Log.WithName("controller_topictransfer")
+var undo = false
+var status = 0
/**
* USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller
@@ -119,119 +122,285 @@
return reconcile.Result{}, err
}
- // TODO: if ConsumerGroup could be decided by listing the topics
- // TODO: what if current name server is compromised
- // step1: add consumer group to target cluster
- nameServer := strings.Split(share.NameServersStr, ";")[0]
- consumerGroup := topicTransfer.Spec.ConsumerGroup
+ topic := topicTransfer.Spec.Topic
targetCluster := topicTransfer.Spec.TargetCluster
sourceCluster := topicTransfer.Spec.SourceCluster
- topic := topicTransfer.Spec.Topic
- addConsumerGroupToTargetCluster := buildAddConsumerGroupToClusterCommand(consumerGroup, targetCluster, nameServer)
- addConsumerGroupToTargetClusterCommand := strings.Join(addConsumerGroupToTargetCluster, " ")
- reqLogger.Info("AddConsumerGroupToTargetClusterCommand: " + addConsumerGroupToTargetClusterCommand)
- cmd := exec.Command("sh", addConsumerGroupToTargetClusterCommand)
- output, err := cmd.Output()
- if err != nil {
- reqLogger.Error(err, "Failed to add ConsumerGroup " + consumerGroup + " to TargetCluster " + targetCluster)
- return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+ nameServer := strings.Split(share.NameServersStr, ";")[0]
+ if len(nameServer) < cons.MinIpListLength {
+ reqLogger.Info("There is no available name server now thus the topic transfer process is terminated.")
+ // terminate the transfer process
+ return reconcile.Result{}, nil
}
- // isAddConsumerGroupToTargetClusterOutputValid(string(output))
- reqLogger.Info("Successfully add ConsumerGroup " + consumerGroup + " to TargetCluster " + targetCluster + " with output: " + string(output))
- // step2: add consumer group to target cluster
- addTopicToTargetCluster := buildAddTopicToClusterCommand(topic, targetCluster, nameServer)
- addTopicToTargetClusterCommand := strings.Join(addTopicToTargetCluster, " ")
- reqLogger.Info("addTopicToTargetClusterCommand: " + addTopicToTargetClusterCommand)
- cmd = exec.Command("sh", addTopicToTargetClusterCommand)
- output, err = cmd.Output()
- if err != nil {
- reqLogger.Error(err, "Failed to add Topic " + topic + " to TargetCluster " + targetCluster)
- return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+ // ConsumerGroup could be decided by listing the topics
+ consumerGroups := getConsumerGroupByTopic(topic, nameServer)
+ if consumerGroups == nil {
+ reqLogger.Info("There is no consumer group of topic " + topic)
}
- // TODO: verify output of addTopicToTargetClusterCommand
- reqLogger.Info("Successfully add Topic " + topic + " to TargetCluster " + targetCluster + " with output: " + string(output))
- // step3: stop write in source cluster topic
- stopSourceClusterTopicWrite := buildStopClusterTopicWriteCommand(topic, sourceCluster, nameServer)
- stopSourceClusterTopicWriteCommand := strings.Join(stopSourceClusterTopicWrite, " ")
- reqLogger.Info("stopSourceClusterTopicWriteCommand: " + stopSourceClusterTopicWriteCommand)
- cmd = exec.Command("sh", stopSourceClusterTopicWriteCommand)
- output, err = cmd.Output()
- if err != nil {
- reqLogger.Error(err, "Failed to stop Topic " + topic + " write in SourceCluster " + sourceCluster)
- return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
- }
- // TODO: verify output of stopSourceClusterTopicWriteCommand
- reqLogger.Info("Successfully stop Topic " + topic + " write in SourceCluster " + sourceCluster + " with output: " + string(output))
+ if undo {
+ // undo the operations for atomicity
+ reqLogger.Info("Transfer topic " + topic + " from " + sourceCluster + " to " + targetCluster + " failed, rolling back...")
+ switch status {
+ case 7:
+ fallthrough
+ case 6:
+ undoDeleteConsumeGroup(consumerGroups, sourceCluster, nameServer)
+ fallthrough
+ case 5:
+ undoDeleteTopic(topic, sourceCluster, nameServer)
+ fallthrough
+ case 4:
+ fallthrough
+ case 3:
+ undoStopWrite(topic, sourceCluster, nameServer)
+ default:
+ // for user data safety, no special operations for other status
+ }
+ } else {
+ // step1: add all consumer groups to target cluster
+ status = 1
+ for i, consumerGroup := range consumerGroups {
+ log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
+ addConsumerGroupToTargetClusterCommand := buildAddConsumerGroupToClusterCommand(consumerGroup, targetCluster, nameServer)
+ reqLogger.Info("AddConsumerGroupToTargetClusterCommand: " + addConsumerGroupToTargetClusterCommand)
+ cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addConsumerGroupToTargetClusterCommand)
+ output, err := cmd.Output()
+ // validate command output
+ if err != nil || !isUpdateConsumerGroupSuccess(string(output)) {
+ reqLogger.Error(err, "Failed to add ConsumerGroup " + consumerGroup + " to TargetCluster " + targetCluster + " with output: " + string(output))
+ // terminate the transfer process
+ undo = true
+ return reconcile.Result{Requeue: true}, err
+ }
+ reqLogger.Info("Successfully add ConsumerGroup " + consumerGroup + " to TargetCluster " + targetCluster + " with output: " + string(output))
+ }
- // step4: check source cluster unconsumed message
- for {
- checkConsumeProgress := buildCheckConsumeProgressCommand(consumerGroup, nameServer)
- checkConsumeProgressCommand := strings.Join(checkConsumeProgress, " ")
- reqLogger.Info("checkConsumeProgressCommand: " + checkConsumeProgressCommand)
- cmd = exec.Command("sh", checkConsumeProgressCommand)
+ // step2: add consumer group to target cluster
+ status = 2
+ addTopicToTargetClusterCommand := buildAddTopicToClusterCommand(topic, targetCluster, nameServer)
+ reqLogger.Info("addTopicToTargetClusterCommand: " + addTopicToTargetClusterCommand)
+ cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addTopicToTargetClusterCommand)
+ output, err := cmd.Output()
+ // validate command output
+ if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
+ reqLogger.Error(err, "Failed to add Topic " + topic + " to TargetCluster " + targetCluster + " with output: " + string(output))
+ // terminate the transfer process
+ undo = true
+ return reconcile.Result{Requeue: true}, err
+ }
+ reqLogger.Info("Successfully add Topic " + topic + " to TargetCluster " + targetCluster + " with output: " + string(output))
+
+ // step3: stop write in source cluster topic
+ status = 3
+ stopSourceClusterTopicWriteCommand := buildStopClusterTopicWriteCommand(topic, sourceCluster, nameServer)
+ reqLogger.Info("stopSourceClusterTopicWriteCommand: " + stopSourceClusterTopicWriteCommand)
+ cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, stopSourceClusterTopicWriteCommand)
output, err = cmd.Output()
- if err != nil {
- reqLogger.Error(err, "Failed to check consumerGroup " + consumerGroup)
- return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
+ // validate command output
+ if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
+ reqLogger.Error(err, "Failed to stop Topic " + topic + " write in SourceCluster " + sourceCluster + " with output: " + string(output))
+ // terminate the transfer process
+ undo = true
+ return reconcile.Result{Requeue: true}, err
}
- var finished bool = isConsumeFinished(string(output))
- reqLogger.Info(" output: " + string(output))
- if finished {
- break
+ reqLogger.Info("Successfully stop Topic " + topic + " write in SourceCluster " + sourceCluster + " with output: " + string(output))
+
+ // step4: check source cluster unconsumed message
+ status = 4
+ for i, consumerGroup := range consumerGroups {
+ log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
+ for {
+ checkConsumeProgressCommand := buildCheckConsumeProgressCommand(consumerGroup, nameServer)
+ reqLogger.Info("checkConsumeProgressCommand: " + checkConsumeProgressCommand)
+ cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, checkConsumeProgressCommand)
+ output, err = cmd.Output()
+ if err != nil || !isCheckConsumeProcessCommandSuccess(string(output)) {
+ reqLogger.Error(err, "Failed to check consumerGroup "+consumerGroup+" with output: "+string(output))
+ // terminate the transfer process
+ undo = true
+ return reconcile.Result{Requeue: true}, err
+ }
+ reqLogger.Info(" output: " + string(output))
+ if isConsumeFinished(string(output), topic, sourceCluster) {
+ reqLogger.Info("Message consumption of " + consumerGroup + " in source cluster " + sourceCluster + " finished!")
+ break
+ }
+ reqLogger.Info("Wait a moment for message consumption of " + consumerGroup + " in source cluster " + sourceCluster + " finish ...")
+ time.Sleep(time.Duration(cons.CheckConsumeFinishIntervalInSecond) * time.Second)
+ }
}
- time.Sleep(time.Duration(5) * time.Second)
+
+ // step5: delete topic in source cluster
+ status = 5
+ deleteSourceClusterTopicCommand := buildDeleteSourceClusterTopicCommand(topic, sourceCluster, nameServer)
+ reqLogger.Info("deleteSourceClusterTopicCommand: " + deleteSourceClusterTopicCommand)
+ cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, deleteSourceClusterTopicCommand)
+ output, err = cmd.Output()
+ if err != nil || !isDeleteTopicCommandSuccess(string(output)) {
+ reqLogger.Error(err, "Failed to delete Topic " + topic + " in SourceCluster " + sourceCluster + " with output: " + string(output))
+ // terminate the transfer process
+ undo = true
+ return reconcile.Result{Requeue: true}, err
+ }
+ reqLogger.Info("Successfully delete Topic " + topic + " in SourceCluster " + sourceCluster + " with output: " + string(output))
+
+ // step6: delete consumer group in source cluster
+ status = 6
+ for i, consumerGroup := range consumerGroups {
+ log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
+ deleteConsumerGroupCommand := buildDeleteConsumeGroupCommand(consumerGroup, sourceCluster, nameServer)
+ reqLogger.Info("deleteConsumerGroupCommand: " + deleteConsumerGroupCommand)
+ cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, deleteConsumerGroupCommand)
+ output, err = cmd.Output()
+ if err != nil || !isDeleteConsumerGroupSuccess(string(output)) {
+ reqLogger.Error(err, "Failed to delete consumer group "+consumerGroup+" in SourceCluster "+sourceCluster+" with output: "+string(output))
+ // terminate the transfer process
+ undo = true
+ return reconcile.Result{Requeue: true}, err
+ }
+ reqLogger.Info("Successfully delete consumer group " + consumerGroup + " in SourceCluster " + sourceCluster + " with output: " + string(output))
+ }
+
+ // step7: create retry topic
+ status = 7
+ for i, consumerGroup := range consumerGroups {
+ log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
+ createRetryTopicCommand := buildAddRetryTopicToClusterCommand(consumerGroup, targetCluster, nameServer)
+ reqLogger.Info("createRetryTopicCommand: " + createRetryTopicCommand)
+ cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, createRetryTopicCommand)
+ output, err = cmd.Output()
+ if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
+ reqLogger.Error(err, "Failed to create retry topic of consumer group " + consumerGroup + " in TargetCluster " + targetCluster + " with output: " + string(output))
+ // terminate the transfer process
+ undo = true
+ return reconcile.Result{Requeue: true}, err
+ }
+ reqLogger.Info("Successfully create retry topic of consumer group " + consumerGroup + " in TargetCluster " + targetCluster + " with output: " + string(output))
+ }
+
+ reqLogger.Info("Topic " + topic + " has been successfully transferred from " + sourceCluster + " to " + targetCluster)
}
- // step5: delete topic in source cluster
- deleteSourceClusterTopic := buildDeleteSourceClusterTopicCommand(topic, sourceCluster, nameServer)
- deleteSourceClusterTopicCommand := strings.Join(deleteSourceClusterTopic, " ")
- reqLogger.Info("deleteSourceClusterTopicCommand: " + deleteSourceClusterTopicCommand)
- cmd = exec.Command("sh", deleteSourceClusterTopicCommand)
- output, err = cmd.Output()
- if err != nil {
- reqLogger.Error(err, "Failed to delete Topic " + topic + " in SourceCluster " + sourceCluster)
- return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
- }
- // TODO: verify output
- reqLogger.Info("Successfully delete Topic " + topic + " in SourceCluster " + sourceCluster + " with output: " + string(output))
-
- // step6: delete consumer group in source cluster
- deleteConsumerGroup := buildDeleteConsumeGroupCommand(consumerGroup, sourceCluster, nameServer)
- deleteConsumerGroupCommand := strings.Join(deleteConsumerGroup, " ")
- reqLogger.Info("deleteConsumerGroupCommand: " + deleteConsumerGroupCommand)
- cmd = exec.Command("sh", deleteConsumerGroupCommand)
- output, err = cmd.Output()
- if err != nil {
- reqLogger.Error(err, "Failed to delete consumer group " + consumerGroup + " in SourceCluster " + sourceCluster)
- return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
- }
- // TODO: verify output
- reqLogger.Info("Successfully delete consumer group " + consumerGroup + " in SourceCluster " + sourceCluster + " with output: " + string(output))
-
- // step7: create retry topic
- createRetryTopic := buildAddRetryTopicToClusterCommand(consumerGroup, targetCluster, nameServer)
- createRetryTopicCommand := strings.Join(createRetryTopic, " ")
- reqLogger.Info("createRetryTopicCommand: " + createRetryTopicCommand)
- cmd = exec.Command("sh", createRetryTopicCommand)
- output, err = cmd.Output()
- if err != nil {
- reqLogger.Error(err, "Failed to create retry topic of consumer group " + consumerGroup + " in TargetCluster " + targetCluster)
- return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(3) * time.Second}, err
- }
- // TODO: verify output
- reqLogger.Info("Successfully create retry topic of consumer group " + consumerGroup + " in TargetCluster " + targetCluster + " with output: " + string(output))
-
- reqLogger.Info("Topic " + topic + " has been transferred from " + sourceCluster + " to " + targetCluster)
return reconcile.Result{}, nil
}
-func buildAddRetryTopicToClusterCommand(consumerGroup string, cluster string, nameServer string) []string {
+func undoStopWrite(topic string, cluster string, nameServer string) {
+ addTopicToClusterCommand := buildUndoStopWriteCommand(topic, cluster, nameServer)
+ log.Info("undoStopWrite: " + addTopicToClusterCommand)
+ cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addTopicToClusterCommand)
+ output, err := cmd.Output()
+ if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
+ log.Error(err, "Failed to undo stop write topic with output: " + string(output))
+ }
+ log.Info("Successfully undo stop write topic with output: " + string(output))
+}
+
+func undoDeleteTopic(topic string, cluster string, nameServer string) {
+ addTopicToClusterCommand := buildAddTopicToClusterCommand(topic, cluster, nameServer)
+ log.Info("undoDeleteTopic: " + addTopicToClusterCommand)
+ cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addTopicToClusterCommand)
+ output, err := cmd.Output()
+ if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
+ log.Error(err, "Failed to undo delete topic with output: " + string(output))
+ }
+ log.Info("Successfully undo delete topic with output: " + string(output))
+}
+
+func undoDeleteConsumeGroup(consumerGroups []string, cluster string, nameServer string) {
+ for _, consumerGroup := range consumerGroups {
+ addConsumerGroupToTargetClusterCommand := buildAddConsumerGroupToClusterCommand(consumerGroup, cluster, nameServer)
+ log.Info("undoDeleteConsumeGroup: " + addConsumerGroupToTargetClusterCommand)
+ cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addConsumerGroupToTargetClusterCommand)
+ output, err := cmd.Output()
+ if err != nil || !isUpdateConsumerGroupSuccess(string(output)) {
+ log.Error(err, "Failed to undo delete consume group with output: " + string(output))
+ }
+ log.Info("Successfully undo delete consume group with output: " + string(output))
+ }
+}
+
+func getConsumerGroupByTopic(topic string, nameServer string) []string {
+ var consumerGroups []string
+ topicListCmd := buildTopicListCommand(nameServer)
+ cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, topicListCmd)
+ output, err := cmd.Output()
+ if err != nil || !isTopicListSuccess(string(output)) {
+ log.Error(err, "Failed to list topic with output: " + string(output))
+ return nil
+ }
+ log.Info("topicListCmd output: " + string(output))
+
+ lines := strings.Split(string(output), "\n")
+ for i := 1; i < len(lines); i++ {
+ fields := strings.Fields(strings.TrimSpace(lines[i]))
+ if len(fields) >= cons.TopicListConsumerGroup {
+ if fields[cons.TopicListTopic] == topic {
+ consumerGroups = append(consumerGroups, fields[cons.TopicListConsumerGroup])
+ }
+ }
+ }
+ return consumerGroups
+}
+
+func isTopicListSuccess(s string) bool {
+ return strings.Contains(s, "#Topic") && strings.Contains(s, "#Consumer Group")
+}
+
+func isCheckConsumeProcessCommandSuccess(s string) bool {
+ return strings.Contains(s, "#Topic")
+}
+
+func isDeleteTopicCommandSuccess(s string) bool {
+ return strings.Contains(s, "delete topic") && strings.Contains(s, "success")
+}
+
+func isUpdateTopicCommandSuccess(s string) bool {
+ return strings.Contains(s, "create topic") && strings.Contains(s, "success")
+}
+
+func isDeleteConsumerGroupSuccess(s string) bool {
+ return strings.Contains(s, "delete subscription group") && strings.Contains(s, "success")
+}
+
+func isUpdateConsumerGroupSuccess(s string) bool {
+ // return strings.Contains(s, "create subscription group") && strings.Contains(s, "success")
+ return strings.Contains(s, "groupName")
+}
+
+func buildUndoStopWriteCommand(topic string, cluster string, nameServer string) string {
cmdOpts := []string{
- cons.AdminToolDir,
+ "updatetopic",
+ "-t",
+ topic,
+ "-c",
+ cluster,
+ "-r",
+ "8",
+ "-w",
+ "8",
+ "-p",
+ "6",
+ "-n",
+ nameServer,
+ }
+ return strings.Join(cmdOpts, " ")
+}
+
+func buildTopicListCommand(nameServer string) string {
+ cmdOpts := []string{
+ "topiclist",
+ "-c",
+ "-n",
+ nameServer,
+ }
+ return strings.Join(cmdOpts, " ")
+}
+
+func buildAddRetryTopicToClusterCommand(consumerGroup string, cluster string, nameServer string) string {
+ cmdOpts := []string{
"updatetopic",
"-t",
"%RETRY%" + consumerGroup,
@@ -246,16 +415,38 @@
"-n",
nameServer,
}
- return cmdOpts
+ return strings.Join(cmdOpts, " ")
}
-func isConsumeFinished(output string) bool {
+func getClusterBrokerNames(cluster string) []string {
+ // TODO: consider more scenarios
+ return []string {cluster}
+}
+
+func isConsumeFinished(output string, topic string, cluster string) bool {
+ lines := strings.Split(output, "\n")
+ brokers := getClusterBrokerNames(cluster)
+ for i := 1; i < len(lines); i++ {
+ fields := strings.Fields(strings.TrimSpace(lines[i]))
+ if len(fields) > cons.DiffIndex {
+ for _, broker := range brokers {
+ log.Info("broker = " + broker)
+ log.Info("fields[cons.TopicIndex] = " + fields[cons.TopicIndex] + " , in line " + strconv.Itoa(i))
+ log.Info("fields[cons.BrokerNameIndex] = " + fields[cons.BrokerNameIndex] + " , in line " + strconv.Itoa(i))
+ log.Info("fields[cons.DiffIndex] = " + fields[cons.DiffIndex] + " , in line " + strconv.Itoa(i))
+ if fields[cons.TopicIndex] == topic && fields[cons.BrokerNameIndex] == broker {
+ if fields[cons.DiffIndex] != "0" {
+ return false
+ }
+ }
+ }
+ }
+ }
return true
}
-func buildDeleteConsumeGroupCommand(consumerGroup string, cluster string, nameServer string) []string {
+func buildDeleteConsumeGroupCommand(consumerGroup string, cluster string, nameServer string) string {
cmdOpts := []string{
- cons.AdminToolDir,
"deleteSubGroup",
"-g",
consumerGroup,
@@ -264,12 +455,11 @@
"-n",
nameServer,
}
- return cmdOpts
+ return strings.Join(cmdOpts, " ")
}
-func buildDeleteSourceClusterTopicCommand(topic string, sourceCluster string, nameServer string) []string {
+func buildDeleteSourceClusterTopicCommand(topic string, sourceCluster string, nameServer string) string {
cmdOpts := []string{
- cons.AdminToolDir,
"deletetopic",
"-t",
topic,
@@ -278,28 +468,22 @@
"-n",
nameServer,
}
- return cmdOpts
+ return strings.Join(cmdOpts, " ")
}
-func isAddConsumerGroupToTargetClusterOutputValid(s string) {
-
-}
-
-func buildCheckConsumeProgressCommand(consumerGroup string, nameServer string) []string {
+func buildCheckConsumeProgressCommand(consumerGroup string, nameServer string) string {
cmdOpts := []string{
- cons.AdminToolDir,
"consumerprogress",
"-g",
consumerGroup,
"-n",
nameServer,
}
- return cmdOpts
+ return strings.Join(cmdOpts, " ")
}
-func buildStopClusterTopicWriteCommand(topic string, cluster string, nameServer string) []string {
+func buildStopClusterTopicWriteCommand(topic string, cluster string, nameServer string) string {
cmdOpts := []string{
- cons.AdminToolDir,
"updatetopic",
"-t",
topic,
@@ -312,12 +496,11 @@
"-n",
nameServer,
}
- return cmdOpts
+ return strings.Join(cmdOpts, " ")
}
-func buildAddConsumerGroupToClusterCommand(consumerGroup string, cluster string, nameServer string) []string {
+func buildAddConsumerGroupToClusterCommand(consumerGroup string, cluster string, nameServer string) string {
cmdOpts := []string{
- cons.AdminToolDir,
"updatesubgroup",
"-g",
consumerGroup,
@@ -330,12 +513,11 @@
"-n",
nameServer,
}
- return cmdOpts
+ return strings.Join(cmdOpts, " ")
}
-func buildAddTopicToClusterCommand(topic string, cluster string, nameServer string) []string {
+func buildAddTopicToClusterCommand(topic string, cluster string, nameServer string) string {
cmdOpts := []string{
- cons.AdminToolDir,
"updatetopic",
"-t",
topic,
@@ -344,5 +526,5 @@
"-n",
nameServer,
}
- return cmdOpts
+ return strings.Join(cmdOpts, " ")
}
diff --git a/purge-operator.sh b/purge-operator.sh
index 81c75d0..f49bb32 100755
--- a/purge-operator.sh
+++ b/purge-operator.sh
@@ -25,4 +25,4 @@
kubectl delete -f deploy/service_account.yaml
kubectl delete -f deploy/crds/rocketmq_v1alpha1_broker_crd.yaml
kubectl delete -f deploy/crds/rocketmq_v1alpha1_nameservice_crd.yaml
-
+kubectl delete -f deploy/crds/rocketmq_v1alpha1_topictransfer_crd.yaml