feat(broker controller): copy metadata
diff --git a/deploy/crds/rocketmq_v1alpha1_broker_crd.yaml b/deploy/crds/rocketmq_v1alpha1_broker_crd.yaml
index 20a5fa0..36fed58 100644
--- a/deploy/crds/rocketmq_v1alpha1_broker_crd.yaml
+++ b/deploy/crds/rocketmq_v1alpha1_broker_crd.yaml
@@ -51,9 +51,6 @@
             replicationMode:
               description: ReplicationMode is SYNC or ASYNC
               type: string
-            scaleContainerName:
-              description: The name of container where the metadata from
-              type: string
             scalePodName:
               description: The name of pod where the metadata from
               type: string
@@ -82,7 +79,6 @@
           - hostPath
           - volumeClaimTemplates
           - scalePodName
-          - scaleContainerName
           type: object
         status:
           properties:
diff --git a/example/rocketmq_v1alpha1_broker_cr.yaml b/example/rocketmq_v1alpha1_broker_cr.yaml
index f0099c5..cc89dc7 100644
--- a/example/rocketmq_v1alpha1_broker_cr.yaml
+++ b/example/rocketmq_v1alpha1_broker_cr.yaml
@@ -39,8 +39,6 @@
   hostPath: /data/rocketmq/broker
   # scalePodName is broker-[broker group number]-master-0
   scalePodName: broker-0-master-0
-  # scaleContainerName is broker-master[broker group number]
-  scaleContainerName: broker-master-0
   # volumeClaimTemplates defines the storageClass
   volumeClaimTemplates:
     - metadata:
diff --git a/pkg/apis/rocketmq/v1alpha1/broker_types.go b/pkg/apis/rocketmq/v1alpha1/broker_types.go
index abd8a71..f3a13c2 100644
--- a/pkg/apis/rocketmq/v1alpha1/broker_types.go
+++ b/pkg/apis/rocketmq/v1alpha1/broker_types.go
@@ -52,8 +52,6 @@
 	VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates"`
 	// The name of pod where the metadata from
 	ScalePodName string `json:"scalePodName"`
-	// The name of container where the metadata from
-	ScaleContainerName string `json:"scaleContainerName"`
 }
 
 // BrokerStatus defines the observed state of Broker
diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go
index 8e8a662..0e895f3 100644
--- a/pkg/constants/constants.go
+++ b/pkg/constants/constants.go
@@ -18,9 +18,11 @@
 package constants
 
 const BrokerClusterPrefix = "broker-cluster-"
-const MasterBrokerContainerNamePrefix = "broker-master-"
-const ReplicaBrokerContainerNamePrefix = "broker-replica-"
+const MasterBrokerContainerName = "broker-master"
+const ReplicaBrokerContainerName = "broker-replica"
 const AdminToolDir = "/home/rocketmq/rocketmq-4.5.0/bin/mqadmin"
+const TopicJsonDir = "/home/rocketmq/store/config/topics.json"
+const SubscriptionGroupJsonDir = "/home/rocketmq/store/config/subscriptionGroup.json"
 const UpdateBrokerConfig  = "updateBrokerConfig"
 const ParamNameServiceAddress = "namesrvAddr"
 const EnvNameServiceAddress = "NAMESRV_ADDR"
diff --git a/pkg/controller/broker/broker_controller.go b/pkg/controller/broker/broker_controller.go
index 1f21cb7..6b95c7e 100644
--- a/pkg/controller/broker/broker_controller.go
+++ b/pkg/controller/broker/broker_controller.go
@@ -19,6 +19,7 @@
 
 import (
 	"context"
+	"reflect"
 	"strconv"
 	"strings"
 	"time"
@@ -31,6 +32,7 @@
 	corev1 "k8s.io/api/core/v1"
 	"k8s.io/apimachinery/pkg/api/errors"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/labels"
 	"k8s.io/apimachinery/pkg/runtime"
 	"k8s.io/apimachinery/pkg/types"
 	"sigs.k8s.io/controller-runtime/pkg/client"
@@ -126,37 +128,6 @@
 		return reconcile.Result{}, err
 	}
 
-	k8s, err := tool.NewK8sClient()
-	if err != nil {
-		log.Error(err,"Error occurred while getting K8s Client" )
-		return reconcile.Result{}, err
-	}
-
-	cmdOpts := []string{
-		"env",
-
-	}
-	// cmdOpts = append(cmdOpts, "-database")
-
-	log.Info("Command being run: " + strings.Join(cmdOpts, " "))
-
-	podName := broker.Spec.ScalePodName
-	container := broker.Spec.ScaleContainerName
-	outputBytes, stderrBytes, err := k8s.Exec(broker.Namespace, podName, container, cmdOpts, nil)
-	stderr := stderrBytes.String()
-	output := outputBytes.String()
-
-	if stderrBytes != nil {
-		log.Info("STDERR: " + stderr)
-	}
-
-	if err != nil {
-		log.Error(err, "Error occured while running command: " )
-		log.Info("Stdout: " + output)
-	} else {
-		log.Info("Backup Output: " + output)
-	}
-
 	share.GroupNum = int(broker.Spec.Size)
 	share.BrokerClusterName = broker.Name
 	replicaPerGroup := broker.Spec.ReplicaPerGroup
@@ -165,7 +136,7 @@
 	for brokerGroupIndex := 0; brokerGroupIndex < share.GroupNum; brokerGroupIndex++ {
 		brokerName := getBrokerName(broker, brokerGroupIndex)
 		reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerGroupIndex+1) + "/" + strconv.Itoa(share.GroupNum))
-		dep := r.statefulSetForMasterBroker(broker, brokerGroupIndex)
+		dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0)
 		// Check if the statefulSet already exists, if not create a new one
 		found := &appsv1.StatefulSet{}
 		err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found)
@@ -182,7 +153,7 @@
 
 		for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ {
 			reqLogger.Info("Check Replica Broker of cluster-" + strconv.Itoa(brokerGroupIndex) + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup))
-			replicaDep := r.statefulSetForReplicaBroker(broker, brokerGroupIndex, replicaIndex)
+			replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex)
 			err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, found)
 			if err != nil && errors.IsNotFound(err) {
 				reqLogger.Info("Creating a new Replica Broker StatefulSet.", "StatefulSet.Namespace", replicaDep.Namespace, "StatefulSet.Name", replicaDep.Name)
@@ -216,7 +187,7 @@
 				// update replicas brokers
 				for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ {
 					reqLogger.Info("Update Replica Broker NAMESRV_ADDR of " + brokerName + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup))
-					replicaDep := r.statefulSetForReplicaBroker(broker, brokerGroupIndex, replicaIndex)
+					replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex)
 					replicaFound := &appsv1.StatefulSet{}
 					err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, replicaFound)
 					if err != nil {
@@ -237,7 +208,74 @@
 	}
 	share.IsNameServersStrUpdated = false
 
-	// Ensure the statefulSet size is the same as the spec
+	// List the pods for this broker's statefulSet
+	podList := &corev1.PodList{}
+	labelSelector := labels.SelectorFromSet(labelsForBroker(broker.Name))
+	listOps := &client.ListOptions{
+		Namespace:     broker.Namespace,
+		LabelSelector: labelSelector,
+	}
+	err = r.client.List(context.TODO(), listOps, podList)
+	if err != nil {
+		reqLogger.Error(err, "Failed to list pods.", "Broker.Namespace", broker.Namespace, "Broker.Name", broker.Name)
+		return reconcile.Result{}, err
+	}
+	podNames := getPodNames(podList.Items)
+
+	// check if the broker scaling logic should be triggered
+	if !reflect.DeepEqual(podNames, broker.Status.Nodes) {
+		// make sure all new brokers pod are in the running phase
+		for _, pod := range podList.Items {
+			for {
+				if reflect.DeepEqual(pod.Status.Phase, corev1.PodRunning) {
+					log.Info("pod " + pod.Name + " is running")
+					break
+				}
+				log.Info("pod " + pod.Name + " phase is " + string(pod.Status.Phase) + " wait for a moment...")
+				time.Sleep(time.Duration(1) * time.Second)
+			}
+		}
+
+		// find all new broker pods
+		var newPodNames []string
+		for _, v1 := range podNames {
+			if !contains(v1, broker.Status.Nodes) {
+				newPodNames = append(newPodNames, v1)
+			}
+		}
+
+		// get the metadata including subscriptionGroup.json and topics.json from scale source pod
+		k8s, err := tool.NewK8sClient()
+		if err != nil {
+			log.Error(err,"Error occurred while getting K8s Client" )
+		}
+
+		sourcePodName := broker.Spec.ScalePodName
+
+		cmdOpts := buildInputCommand(cons.TopicJsonDir)
+		topicJson := exec(cmdOpts, sourcePodName, k8s, broker.Namespace)
+		cmdOpts = buildInputCommand(cons.SubscriptionGroupJsonDir)
+		subscriptionGroupJson := exec(cmdOpts, sourcePodName, k8s, broker.Namespace)
+
+		if len(topicJson) < 5 {
+			log.Info("The file topic.json or subscriptionGroup.json is abnormally too short to perform metadata transmission, please check whether the source broker pod is correct")
+		} else {
+			// for each new pod, copy the metadata from scale source pod
+			for _, newPodName := range newPodNames {
+				cmdOpts = buildOutputCommand(topicJson, cons.TopicJsonDir)
+				exec(cmdOpts, newPodName, k8s, broker.Namespace)
+				cmdOpts = buildOutputCommand(subscriptionGroupJson, cons.SubscriptionGroupJsonDir)
+				exec(cmdOpts, newPodName, k8s, broker.Namespace)
+			}
+
+			broker.Status.Nodes = podNames
+			err = r.client.Status().Update(context.TODO(), broker)
+			if err != nil {
+				reqLogger.Error(err, "Failed to update Broker status.")
+			}
+		}
+	}
+
 	//size := broker.Spec.Size
 	//if *found.Spec.Replicas != size {
 	//	found.Spec.Replicas = &size
@@ -279,19 +317,76 @@
 	return reconcile.Result{true, time.Duration(3) * time.Second}, nil
 }
 
+func buildInputCommand(source string) []string {
+	cmdOpts := []string{
+		"cat",
+		source,
+	}
+	return cmdOpts
+}
+
+func buildOutputCommand(content string, dest string) []string {
+	cmdOpts := []string{
+		"echo",
+		"-e",
+		"\"" + content + "\"",
+		">",
+		dest,
+	}
+	return cmdOpts
+}
+
+func exec(cmdOpts []string, podName string, k8s *tool.K8sClient, namespace string) string {
+	log.Info("Command being run: " + strings.Join(cmdOpts, " "))
+	container := cons.MasterBrokerContainerName
+	outputBytes, stderrBytes, err := k8s.Exec(namespace, podName, container, cmdOpts, nil)
+	stderr := stderrBytes.String()
+	output := outputBytes.String()
+
+	if stderrBytes != nil {
+		log.Info("STDERR: " + stderr)
+	}
+
+	if err != nil {
+		log.Error(err, "Error occurred while running command: " + strings.Join(cmdOpts, " "))
+		log.Info("stdout: " + output)
+	} else {
+		log.Info("output: " + output)
+	}
+	return output
+}
+
+func contains(item string, arr []string) bool {
+	for _, value := range arr {
+		if reflect.DeepEqual(value, item) {
+			return true
+		}
+	}
+	return false
+}
 
 func getBrokerName(broker *rocketmqv1alpha1.Broker, brokerGroupIndex int) string {
 	return broker.Name + "-" + strconv.Itoa(brokerGroupIndex)
 }
 
-// statefulSetForBroker returns a master broker StatefulSet object
-func (r *ReconcileBroker) statefulSetForMasterBroker(broker *rocketmqv1alpha1.Broker, brokerGroupIndex int) *appsv1.StatefulSet {
+// getBrokerStatefulSet returns a broker StatefulSet object
+func (r *ReconcileBroker) getBrokerStatefulSet(broker *rocketmqv1alpha1.Broker, brokerGroupIndex int, replicaIndex int) *appsv1.StatefulSet {
 	ls := labelsForBroker(broker.Name)
 	var a int32 = 1
 	var c = &a
+	var containerName string
+	var statefulSetName string
+	if replicaIndex == 0 {
+		containerName = cons.MasterBrokerContainerName
+		statefulSetName = broker.Name + "-" + strconv.Itoa(brokerGroupIndex) + "-master"
+	} else {
+		containerName = cons.ReplicaBrokerContainerName
+		statefulSetName = broker.Name + "-" + strconv.Itoa(brokerGroupIndex) + "-replica-" + strconv.Itoa(replicaIndex)
+	}
+
 	dep := &appsv1.StatefulSet{
 		ObjectMeta: metav1.ObjectMeta{
-			Name:      broker.Name + "-" + strconv.Itoa(brokerGroupIndex) + "-master",
+			Name:      statefulSetName,
 			Namespace: broker.Namespace,
 		},
 		Spec: appsv1.StatefulSetSpec{
@@ -309,80 +404,7 @@
 				Spec: corev1.PodSpec{
 					Containers: []corev1.Container{{
 						Image:           broker.Spec.BrokerImage,
-						Name:            cons.MasterBrokerContainerNamePrefix + strconv.Itoa(brokerGroupIndex),
-						ImagePullPolicy: broker.Spec.ImagePullPolicy,
-						Env: []corev1.EnvVar{{
-							Name:  cons.EnvNameServiceAddress,
-							Value: broker.Spec.NameServers,
-						}, {
-							Name:  cons.EnvReplicationMode,
-							Value: broker.Spec.ReplicationMode,
-						}, {
-							Name:  cons.EnvBrokerId,
-							Value: "0",
-						}, {
-							Name:  cons.EnvBrokerClusterName,
-							Value: broker.Name,
-						}, {
-							Name:  cons.EnvBrokerName,
-							Value: broker.Name + "-" + strconv.Itoa(brokerGroupIndex),
-						}},
-						Ports: []corev1.ContainerPort{{
-							ContainerPort: cons.BrokerVipContainerPort,
-							Name:          cons.BrokerVipContainerPortName,
-						}, {
-							ContainerPort: cons.BrokerMainContainerPort,
-							Name:          cons.BrokerMainContainerPortName,
-						}, {
-							ContainerPort: cons.BrokerHighAvailabilityContainerPort,
-							Name:          cons.BrokerHighAvailabilityContainerPortName,
-						}},
-						VolumeMounts: []corev1.VolumeMount{{
-							MountPath: cons.LogMountPath,
-							Name:      broker.Spec.VolumeClaimTemplates[0].Name,
-							SubPath:   cons.LogSubPathName + getPathSuffix(broker, brokerGroupIndex, 0),
-						},{
-							MountPath: cons.StoreMountPath,
-							Name:      broker.Spec.VolumeClaimTemplates[0].Name,
-							SubPath:   cons.StoreSubPathName + getPathSuffix(broker, brokerGroupIndex, 0),
-						}},
-					}},
-					Volumes: getVolumes(broker),
-				},
-			},
-			VolumeClaimTemplates: getVolumeClaimTemplates(broker),
-		},
-	}
-	// Set Broker instance as the owner and controller
-	controllerutil.SetControllerReference(broker, dep, r.scheme)
-
-	return dep
-
-}
-
-// statefulSetForBroker returns a replica broker StatefulSet object
-func (r *ReconcileBroker) statefulSetForReplicaBroker(broker *rocketmqv1alpha1.Broker, brokerGroupIndex int, replicaIndex int) *appsv1.StatefulSet {
-	ls := labelsForBroker(broker.Name)
-	var a int32 = 1
-	var c = &a
-	dep := &appsv1.StatefulSet{
-		ObjectMeta: metav1.ObjectMeta{
-			Name:      broker.Name + "-" + strconv.Itoa(brokerGroupIndex) + "-replica-" + strconv.Itoa(replicaIndex),
-			Namespace: broker.Namespace,
-		},
-		Spec: appsv1.StatefulSetSpec{
-			Replicas: c,
-			Selector: &metav1.LabelSelector{
-				MatchLabels: ls,
-			},
-			Template: corev1.PodTemplateSpec{
-				ObjectMeta: metav1.ObjectMeta{
-					Labels: ls,
-				},
-				Spec: corev1.PodSpec{
-					Containers: []corev1.Container{{
-						Image:           broker.Spec.BrokerImage,
-						Name:            cons.ReplicaBrokerContainerNamePrefix + strconv.Itoa(brokerGroupIndex),
+						Name:            containerName,
 						ImagePullPolicy: broker.Spec.ImagePullPolicy,
 						Env: []corev1.EnvVar{{
 							Name:  cons.EnvNameServiceAddress,