use statefulSet in broker CRD
diff --git a/pkg/controller/broker/broker_controller.go b/pkg/controller/broker/broker_controller.go
index 2d738f2..91780d4 100644
--- a/pkg/controller/broker/broker_controller.go
+++ b/pkg/controller/broker/broker_controller.go
@@ -70,7 +70,7 @@
 
 	// TODO(user): Modify this to be the types you create that are owned by the primary resource
 	// Watch for changes to secondary resource Pods and requeue the owner Broker
-	err = c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{
+	err = c.Watch(&source.Kind{Type: &appsv1.StatefulSet{}}, &handler.EnqueueRequestForOwner{
 		IsController: true,
 		OwnerType:    &cachev1alpha1.Broker{},
 	})
@@ -95,7 +95,7 @@
 // Reconcile reads that state of the cluster for a Broker object and makes changes based on the state read
 // and what is in the Broker.Spec
 // TODO(user): Modify this Reconcile function to implement your Controller logic.  This example creates
-// a Broker Deployment for each Broker CR
+// a Broker StatefulSet for each Broker CR
 // Note:
 // The Controller will requeue the Request to be processed again if the returned error is non-nil or
 // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
@@ -119,8 +119,8 @@
 		return reconcile.Result{}, err
 	}
 
-	// Check if the deployment already exists, if not create a new one
-	found := &appsv1.Deployment{}
+	// Check if the statefulSet already exists, if not create a new one
+	found := &appsv1.StatefulSet{}
 
 	share.GroupNum = int(broker.Spec.Size)
 	slavePerGroup := broker.Spec.SlavePerGroup
@@ -128,31 +128,31 @@
 
 	for brokerClusterIndex := 0; brokerClusterIndex < share.GroupNum; brokerClusterIndex++ {
 		reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerClusterIndex+1) + "/" + strconv.Itoa(share.GroupNum))
-		dep := r.deploymentForMasterBroker(broker, brokerClusterIndex)
+		dep := r.statefulSetForMasterBroker(broker, brokerClusterIndex)
 		err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found)
 
 		if err != nil && errors.IsNotFound(err) {
-			reqLogger.Info("Creating a new Master Broker Deployment.", "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
+			reqLogger.Info("Creating a new Master Broker StatefulSet.", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name)
 			err = r.client.Create(context.TODO(), dep)
 			for slaveIndex := 1; slaveIndex <= slavePerGroup; slaveIndex++ {
 				reqLogger.Info("Check Slave Broker of cluster-" + strconv.Itoa(brokerClusterIndex) + " " + strconv.Itoa(slaveIndex) + "/" + strconv.Itoa(slavePerGroup))
-				slaveDep := r.deploymentForSlaveBroker(broker, brokerClusterIndex, slaveIndex)
+				slaveDep := r.statefulSetForSlaveBroker(broker, brokerClusterIndex, slaveIndex)
 				err = r.client.Get(context.TODO(), types.NamespacedName{Name: slaveDep.Name, Namespace: slaveDep.Namespace}, found)
 				if err != nil && errors.IsNotFound(err) {
-					reqLogger.Info("Creating a new Slave Broker Deployment.", "Deployment.Namespace", slaveDep.Namespace, "Deployment.Name", slaveDep.Name)
+					reqLogger.Info("Creating a new Slave Broker StatefulSet.", "StatefulSet.Namespace", slaveDep.Namespace, "StatefulSet.Name", slaveDep.Name)
 					err = r.client.Create(context.TODO(), slaveDep)
 					if err != nil {
-						reqLogger.Error(err, "Failed to create new Deployment of broker-"+strconv.Itoa(brokerClusterIndex)+"-slave-"+strconv.Itoa(slaveIndex), "Deployment.Namespace", slaveDep.Namespace, "Deployment.Name", slaveDep.Name)
+						reqLogger.Error(err, "Failed to create new StatefulSet of broker-"+strconv.Itoa(brokerClusterIndex)+"-slave-"+strconv.Itoa(slaveIndex), "StatefulSet.Namespace", slaveDep.Namespace, "StatefulSet.Name", slaveDep.Name)
 					}
 				} else if err != nil {
-					reqLogger.Error(err, "Failed to get broker slave Deployment.")
+					reqLogger.Error(err, "Failed to get broker slave StatefulSet.")
 				}
 			}
 			if err != nil {
-				reqLogger.Error(err, "Failed to create new Deployment of "+cons.BrokerClusterPrefix+strconv.Itoa(brokerClusterIndex), "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
+				reqLogger.Error(err, "Failed to create new StatefulSet of "+cons.BrokerClusterPrefix+strconv.Itoa(brokerClusterIndex), "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name)
 			}
 		} else if err != nil {
-			reqLogger.Error(err, "Failed to get broker master Deployment.")
+			reqLogger.Error(err, "Failed to get broker master StatefulSet.")
 		}
 
 		// The following code will restart all brokers to update NAMESRV_ADDR env
@@ -161,29 +161,29 @@
 			found.Spec.Template.Spec.Containers[0].Env[0].Value = share.MetaServersStr
 			err = r.client.Update(context.TODO(), found)
 			if err != nil {
-				reqLogger.Error(err, "Failed to update NAMESRV_ADDR of master broker in "+cons.BrokerClusterPrefix+strconv.Itoa(brokerClusterIndex), "Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
+				reqLogger.Error(err, "Failed to update NAMESRV_ADDR of master broker in "+cons.BrokerClusterPrefix+strconv.Itoa(brokerClusterIndex), "StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name)
 			}
 			for slaveIndex := 1; slaveIndex <= slavePerGroup; slaveIndex++ {
 				reqLogger.Info("Update Slave Broker NAMESRV_ADDR of cluster-" + strconv.Itoa(brokerClusterIndex) + " " + strconv.Itoa(slaveIndex) + "/" + strconv.Itoa(slavePerGroup))
-				slaveDep := r.deploymentForSlaveBroker(broker, brokerClusterIndex, slaveIndex)
+				slaveDep := r.statefulSetForSlaveBroker(broker, brokerClusterIndex, slaveIndex)
 				err = r.client.Get(context.TODO(), types.NamespacedName{Name: slaveDep.Name, Namespace: slaveDep.Namespace}, found)
 				found.Spec.Template.Spec.Containers[0].Env[0].Value = share.MetaServersStr
 				err = r.client.Update(context.TODO(), found)
 				if err != nil {
-					reqLogger.Error(err, "Failed to update NAMESRV_ADDR of broker-"+strconv.Itoa(brokerClusterIndex)+"-slave-"+strconv.Itoa(slaveIndex), "Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
+					reqLogger.Error(err, "Failed to update NAMESRV_ADDR of broker-"+strconv.Itoa(brokerClusterIndex)+"-slave-"+strconv.Itoa(slaveIndex), "StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name)
 				}
 			}
 		}*/
 	}
 	share.IsMetaServersStrUpdated = false
 
-	// Ensure the deployment size is the same as the spec
+	// Ensure the statefulSet size is the same as the spec
 	//size := broker.Spec.Size
 	//if *found.Spec.Replicas != size {
 	//	found.Spec.Replicas = &size
 	//	err = r.client.Update(context.TODO(), found)
 	//	if err != nil {
-	//		reqLogger.Error(err, "Failed to update Deployment.", "Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
+	//		reqLogger.Error(err, "Failed to update StatefulSet.", "StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name)
 	//		return reconcile.Result{}, err
 	//	}
 	//	// Spec updated - return and requeue
@@ -191,7 +191,7 @@
 	//}
 
 	// Update the Broker status with the pod names
-	// List the pods for this broker's deployment
+	// List the pods for this broker's statefulSet
 
 	//podList := &corev1.PodList{}
 	//labelSelector := labels.SelectorFromSet(labelsForBroker(broker.Name))
@@ -219,17 +219,17 @@
 	return reconcile.Result{true, time.Duration(3) * time.Second}, nil
 }
 
-// deploymentForBroker returns a master broker Deployment object
-func (r *ReconcileBroker) deploymentForMasterBroker(m *cachev1alpha1.Broker, brokerClusterIndex int) *appsv1.Deployment {
+// statefulSetForBroker returns a master broker StatefulSet object
+func (r *ReconcileBroker) statefulSetForMasterBroker(m *cachev1alpha1.Broker, brokerClusterIndex int) *appsv1.StatefulSet {
 	ls := labelsForBroker(m.Name)
 	var a int32 = 1
 	var c = &a
-	dep := &appsv1.Deployment{
+	dep := &appsv1.StatefulSet{
 		ObjectMeta: metav1.ObjectMeta{
 			Name:      m.Name + "-" + strconv.Itoa(brokerClusterIndex) + "-master",
 			Namespace: m.Namespace,
 		},
-		Spec: appsv1.DeploymentSpec{
+		Spec: appsv1.StatefulSetSpec{
 			Replicas: c,
 			Selector: &metav1.LabelSelector{
 				MatchLabels: ls,
@@ -279,17 +279,17 @@
 
 }
 
-// deploymentForBroker returns a slave broker Deployment object
-func (r *ReconcileBroker) deploymentForSlaveBroker(m *cachev1alpha1.Broker, brokerClusterIndex int, slaveIndex int) *appsv1.Deployment {
+// statefulSetForBroker returns a slave broker StatefulSet object
+func (r *ReconcileBroker) statefulSetForSlaveBroker(m *cachev1alpha1.Broker, brokerClusterIndex int, slaveIndex int) *appsv1.StatefulSet {
 	ls := labelsForBroker(m.Name)
 	var a int32 = 1
 	var c = &a
-	dep := &appsv1.Deployment{
+	dep := &appsv1.StatefulSet{
 		ObjectMeta: metav1.ObjectMeta{
 			Name:      m.Name + "-" + strconv.Itoa(brokerClusterIndex) + "-slave-" + strconv.Itoa(slaveIndex),
 			Namespace: m.Namespace,
 		},
-		Spec: appsv1.DeploymentSpec{
+		Spec: appsv1.StatefulSetSpec{
 			Replicas: c,
 			Selector: &metav1.LabelSelector{
 				MatchLabels: ls,