feat(broker controller): copy metadata
diff --git a/deploy/crds/rocketmq_v1alpha1_broker_crd.yaml b/deploy/crds/rocketmq_v1alpha1_broker_crd.yaml
index 20a5fa0..36fed58 100644
--- a/deploy/crds/rocketmq_v1alpha1_broker_crd.yaml
+++ b/deploy/crds/rocketmq_v1alpha1_broker_crd.yaml
@@ -51,9 +51,6 @@
replicationMode:
description: ReplicationMode is SYNC or ASYNC
type: string
- scaleContainerName:
- description: The name of container where the metadata from
- type: string
scalePodName:
description: The name of pod where the metadata from
type: string
@@ -82,7 +79,6 @@
- hostPath
- volumeClaimTemplates
- scalePodName
- - scaleContainerName
type: object
status:
properties:
diff --git a/example/rocketmq_v1alpha1_broker_cr.yaml b/example/rocketmq_v1alpha1_broker_cr.yaml
index f0099c5..cc89dc7 100644
--- a/example/rocketmq_v1alpha1_broker_cr.yaml
+++ b/example/rocketmq_v1alpha1_broker_cr.yaml
@@ -39,8 +39,6 @@
hostPath: /data/rocketmq/broker
# scalePodName is broker-[broker group number]-master-0
scalePodName: broker-0-master-0
- # scaleContainerName is broker-master[broker group number]
- scaleContainerName: broker-master-0
# volumeClaimTemplates defines the storageClass
volumeClaimTemplates:
- metadata:
diff --git a/pkg/apis/rocketmq/v1alpha1/broker_types.go b/pkg/apis/rocketmq/v1alpha1/broker_types.go
index abd8a71..f3a13c2 100644
--- a/pkg/apis/rocketmq/v1alpha1/broker_types.go
+++ b/pkg/apis/rocketmq/v1alpha1/broker_types.go
@@ -52,8 +52,6 @@
VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates"`
// The name of pod where the metadata from
ScalePodName string `json:"scalePodName"`
- // The name of container where the metadata from
- ScaleContainerName string `json:"scaleContainerName"`
}
// BrokerStatus defines the observed state of Broker
diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go
index 8e8a662..0e895f3 100644
--- a/pkg/constants/constants.go
+++ b/pkg/constants/constants.go
@@ -18,9 +18,11 @@
package constants
const BrokerClusterPrefix = "broker-cluster-"
-const MasterBrokerContainerNamePrefix = "broker-master-"
-const ReplicaBrokerContainerNamePrefix = "broker-replica-"
+const MasterBrokerContainerName = "broker-master"
+const ReplicaBrokerContainerName = "broker-replica"
const AdminToolDir = "/home/rocketmq/rocketmq-4.5.0/bin/mqadmin"
+const TopicJsonDir = "/home/rocketmq/store/config/topics.json"
+const SubscriptionGroupJsonDir = "/home/rocketmq/store/config/subscriptionGroup.json"
const UpdateBrokerConfig = "updateBrokerConfig"
const ParamNameServiceAddress = "namesrvAddr"
const EnvNameServiceAddress = "NAMESRV_ADDR"
diff --git a/pkg/controller/broker/broker_controller.go b/pkg/controller/broker/broker_controller.go
index 1f21cb7..6b95c7e 100644
--- a/pkg/controller/broker/broker_controller.go
+++ b/pkg/controller/broker/broker_controller.go
@@ -19,6 +19,7 @@
import (
"context"
+ "reflect"
"strconv"
"strings"
"time"
@@ -31,6 +32,7 @@
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -126,37 +128,6 @@
return reconcile.Result{}, err
}
- k8s, err := tool.NewK8sClient()
- if err != nil {
- log.Error(err,"Error occurred while getting K8s Client" )
- return reconcile.Result{}, err
- }
-
- cmdOpts := []string{
- "env",
-
- }
- // cmdOpts = append(cmdOpts, "-database")
-
- log.Info("Command being run: " + strings.Join(cmdOpts, " "))
-
- podName := broker.Spec.ScalePodName
- container := broker.Spec.ScaleContainerName
- outputBytes, stderrBytes, err := k8s.Exec(broker.Namespace, podName, container, cmdOpts, nil)
- stderr := stderrBytes.String()
- output := outputBytes.String()
-
- if stderrBytes != nil {
- log.Info("STDERR: " + stderr)
- }
-
- if err != nil {
- log.Error(err, "Error occured while running command: " )
- log.Info("Stdout: " + output)
- } else {
- log.Info("Backup Output: " + output)
- }
-
share.GroupNum = int(broker.Spec.Size)
share.BrokerClusterName = broker.Name
replicaPerGroup := broker.Spec.ReplicaPerGroup
@@ -165,7 +136,7 @@
for brokerGroupIndex := 0; brokerGroupIndex < share.GroupNum; brokerGroupIndex++ {
brokerName := getBrokerName(broker, brokerGroupIndex)
reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerGroupIndex+1) + "/" + strconv.Itoa(share.GroupNum))
- dep := r.statefulSetForMasterBroker(broker, brokerGroupIndex)
+ dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0)
// Check if the statefulSet already exists, if not create a new one
found := &appsv1.StatefulSet{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found)
@@ -182,7 +153,7 @@
for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ {
reqLogger.Info("Check Replica Broker of cluster-" + strconv.Itoa(brokerGroupIndex) + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup))
- replicaDep := r.statefulSetForReplicaBroker(broker, brokerGroupIndex, replicaIndex)
+ replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex)
err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, found)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating a new Replica Broker StatefulSet.", "StatefulSet.Namespace", replicaDep.Namespace, "StatefulSet.Name", replicaDep.Name)
@@ -216,7 +187,7 @@
// update replicas brokers
for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ {
reqLogger.Info("Update Replica Broker NAMESRV_ADDR of " + brokerName + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup))
- replicaDep := r.statefulSetForReplicaBroker(broker, brokerGroupIndex, replicaIndex)
+ replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex)
replicaFound := &appsv1.StatefulSet{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, replicaFound)
if err != nil {
@@ -237,7 +208,74 @@
}
share.IsNameServersStrUpdated = false
- // Ensure the statefulSet size is the same as the spec
+ // List the pods for this broker's statefulSet
+ podList := &corev1.PodList{}
+ labelSelector := labels.SelectorFromSet(labelsForBroker(broker.Name))
+ listOps := &client.ListOptions{
+ Namespace: broker.Namespace,
+ LabelSelector: labelSelector,
+ }
+ err = r.client.List(context.TODO(), listOps, podList)
+ if err != nil {
+ reqLogger.Error(err, "Failed to list pods.", "Broker.Namespace", broker.Namespace, "Broker.Name", broker.Name)
+ return reconcile.Result{}, err
+ }
+ podNames := getPodNames(podList.Items)
+
+ // check if the broker scaling logic should be triggered
+ if !reflect.DeepEqual(podNames, broker.Status.Nodes) {
+ // make sure all new brokers pod are in the running phase
+ for _, pod := range podList.Items {
+ for {
+ if reflect.DeepEqual(pod.Status.Phase, corev1.PodRunning) {
+ log.Info("pod " + pod.Name + " is running")
+ break
+ }
+ log.Info("pod " + pod.Name + " phase is " + string(pod.Status.Phase) + " wait for a moment...")
+ time.Sleep(time.Duration(1) * time.Second)
+ }
+ }
+
+ // find all new broker pods
+ var newPodNames []string
+ for _, v1 := range podNames {
+ if !contains(v1, broker.Status.Nodes) {
+ newPodNames = append(newPodNames, v1)
+ }
+ }
+
+ // get the metadata including subscriptionGroup.json and topics.json from scale source pod
+ k8s, err := tool.NewK8sClient()
+ if err != nil {
+ log.Error(err,"Error occurred while getting K8s Client" )
+ }
+
+ sourcePodName := broker.Spec.ScalePodName
+
+ cmdOpts := buildInputCommand(cons.TopicJsonDir)
+ topicJson := exec(cmdOpts, sourcePodName, k8s, broker.Namespace)
+ cmdOpts = buildInputCommand(cons.SubscriptionGroupJsonDir)
+ subscriptionGroupJson := exec(cmdOpts, sourcePodName, k8s, broker.Namespace)
+
+ if len(topicJson) < 5 {
+ log.Info("The file topic.json or subscriptionGroup.json is abnormally too short to perform metadata transmission, please check whether the source broker pod is correct")
+ } else {
+ // for each new pod, copy the metadata from scale source pod
+ for _, newPodName := range newPodNames {
+ cmdOpts = buildOutputCommand(topicJson, cons.TopicJsonDir)
+ exec(cmdOpts, newPodName, k8s, broker.Namespace)
+ cmdOpts = buildOutputCommand(subscriptionGroupJson, cons.SubscriptionGroupJsonDir)
+ exec(cmdOpts, newPodName, k8s, broker.Namespace)
+ }
+
+ broker.Status.Nodes = podNames
+ err = r.client.Status().Update(context.TODO(), broker)
+ if err != nil {
+ reqLogger.Error(err, "Failed to update Broker status.")
+ }
+ }
+ }
+
//size := broker.Spec.Size
//if *found.Spec.Replicas != size {
// found.Spec.Replicas = &size
@@ -279,19 +317,76 @@
return reconcile.Result{true, time.Duration(3) * time.Second}, nil
}
+func buildInputCommand(source string) []string {
+ cmdOpts := []string{
+ "cat",
+ source,
+ }
+ return cmdOpts
+}
+
+func buildOutputCommand(content string, dest string) []string {
+ cmdOpts := []string{
+ "echo",
+ "-e",
+ "\"" + content + "\"",
+ ">",
+ dest,
+ }
+ return cmdOpts
+}
+
+func exec(cmdOpts []string, podName string, k8s *tool.K8sClient, namespace string) string {
+ log.Info("Command being run: " + strings.Join(cmdOpts, " "))
+ container := cons.MasterBrokerContainerName
+ outputBytes, stderrBytes, err := k8s.Exec(namespace, podName, container, cmdOpts, nil)
+ stderr := stderrBytes.String()
+ output := outputBytes.String()
+
+ if stderrBytes != nil {
+ log.Info("STDERR: " + stderr)
+ }
+
+ if err != nil {
+ log.Error(err, "Error occurred while running command: " + strings.Join(cmdOpts, " "))
+ log.Info("stdout: " + output)
+ } else {
+ log.Info("output: " + output)
+ }
+ return output
+}
+
+func contains(item string, arr []string) bool {
+ for _, value := range arr {
+ if reflect.DeepEqual(value, item) {
+ return true
+ }
+ }
+ return false
+}
func getBrokerName(broker *rocketmqv1alpha1.Broker, brokerGroupIndex int) string {
return broker.Name + "-" + strconv.Itoa(brokerGroupIndex)
}
-// statefulSetForBroker returns a master broker StatefulSet object
-func (r *ReconcileBroker) statefulSetForMasterBroker(broker *rocketmqv1alpha1.Broker, brokerGroupIndex int) *appsv1.StatefulSet {
+// getBrokerStatefulSet returns a broker StatefulSet object
+func (r *ReconcileBroker) getBrokerStatefulSet(broker *rocketmqv1alpha1.Broker, brokerGroupIndex int, replicaIndex int) *appsv1.StatefulSet {
ls := labelsForBroker(broker.Name)
var a int32 = 1
var c = &a
+ var containerName string
+ var statefulSetName string
+ if replicaIndex == 0 {
+ containerName = cons.MasterBrokerContainerName
+ statefulSetName = broker.Name + "-" + strconv.Itoa(brokerGroupIndex) + "-master"
+ } else {
+ containerName = cons.ReplicaBrokerContainerName
+ statefulSetName = broker.Name + "-" + strconv.Itoa(brokerGroupIndex) + "-replica-" + strconv.Itoa(replicaIndex)
+ }
+
dep := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
- Name: broker.Name + "-" + strconv.Itoa(brokerGroupIndex) + "-master",
+ Name: statefulSetName,
Namespace: broker.Namespace,
},
Spec: appsv1.StatefulSetSpec{
@@ -309,80 +404,7 @@
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Image: broker.Spec.BrokerImage,
- Name: cons.MasterBrokerContainerNamePrefix + strconv.Itoa(brokerGroupIndex),
- ImagePullPolicy: broker.Spec.ImagePullPolicy,
- Env: []corev1.EnvVar{{
- Name: cons.EnvNameServiceAddress,
- Value: broker.Spec.NameServers,
- }, {
- Name: cons.EnvReplicationMode,
- Value: broker.Spec.ReplicationMode,
- }, {
- Name: cons.EnvBrokerId,
- Value: "0",
- }, {
- Name: cons.EnvBrokerClusterName,
- Value: broker.Name,
- }, {
- Name: cons.EnvBrokerName,
- Value: broker.Name + "-" + strconv.Itoa(brokerGroupIndex),
- }},
- Ports: []corev1.ContainerPort{{
- ContainerPort: cons.BrokerVipContainerPort,
- Name: cons.BrokerVipContainerPortName,
- }, {
- ContainerPort: cons.BrokerMainContainerPort,
- Name: cons.BrokerMainContainerPortName,
- }, {
- ContainerPort: cons.BrokerHighAvailabilityContainerPort,
- Name: cons.BrokerHighAvailabilityContainerPortName,
- }},
- VolumeMounts: []corev1.VolumeMount{{
- MountPath: cons.LogMountPath,
- Name: broker.Spec.VolumeClaimTemplates[0].Name,
- SubPath: cons.LogSubPathName + getPathSuffix(broker, brokerGroupIndex, 0),
- },{
- MountPath: cons.StoreMountPath,
- Name: broker.Spec.VolumeClaimTemplates[0].Name,
- SubPath: cons.StoreSubPathName + getPathSuffix(broker, brokerGroupIndex, 0),
- }},
- }},
- Volumes: getVolumes(broker),
- },
- },
- VolumeClaimTemplates: getVolumeClaimTemplates(broker),
- },
- }
- // Set Broker instance as the owner and controller
- controllerutil.SetControllerReference(broker, dep, r.scheme)
-
- return dep
-
-}
-
-// statefulSetForBroker returns a replica broker StatefulSet object
-func (r *ReconcileBroker) statefulSetForReplicaBroker(broker *rocketmqv1alpha1.Broker, brokerGroupIndex int, replicaIndex int) *appsv1.StatefulSet {
- ls := labelsForBroker(broker.Name)
- var a int32 = 1
- var c = &a
- dep := &appsv1.StatefulSet{
- ObjectMeta: metav1.ObjectMeta{
- Name: broker.Name + "-" + strconv.Itoa(brokerGroupIndex) + "-replica-" + strconv.Itoa(replicaIndex),
- Namespace: broker.Namespace,
- },
- Spec: appsv1.StatefulSetSpec{
- Replicas: c,
- Selector: &metav1.LabelSelector{
- MatchLabels: ls,
- },
- Template: corev1.PodTemplateSpec{
- ObjectMeta: metav1.ObjectMeta{
- Labels: ls,
- },
- Spec: corev1.PodSpec{
- Containers: []corev1.Container{{
- Image: broker.Spec.BrokerImage,
- Name: cons.ReplicaBrokerContainerNamePrefix + strconv.Itoa(brokerGroupIndex),
+ Name: containerName,
ImagePullPolicy: broker.Spec.ImagePullPolicy,
Env: []corev1.EnvVar{{
Name: cons.EnvNameServiceAddress,