use statefulSet in broker CRD
diff --git a/pkg/controller/broker/broker_controller.go b/pkg/controller/broker/broker_controller.go
index 2d738f2..91780d4 100644
--- a/pkg/controller/broker/broker_controller.go
+++ b/pkg/controller/broker/broker_controller.go
@@ -70,7 +70,7 @@
// TODO(user): Modify this to be the types you create that are owned by the primary resource
// Watch for changes to secondary resource Pods and requeue the owner Broker
- err = c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{
+ err = c.Watch(&source.Kind{Type: &appsv1.StatefulSet{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &cachev1alpha1.Broker{},
})
@@ -95,7 +95,7 @@
// Reconcile reads that state of the cluster for a Broker object and makes changes based on the state read
// and what is in the Broker.Spec
// TODO(user): Modify this Reconcile function to implement your Controller logic. This example creates
-// a Broker Deployment for each Broker CR
+// a Broker StatefulSet for each Broker CR
// Note:
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
@@ -119,8 +119,8 @@
return reconcile.Result{}, err
}
- // Check if the deployment already exists, if not create a new one
- found := &appsv1.Deployment{}
+ // Check if the statefulSet already exists, if not create a new one
+ found := &appsv1.StatefulSet{}
share.GroupNum = int(broker.Spec.Size)
slavePerGroup := broker.Spec.SlavePerGroup
@@ -128,31 +128,31 @@
for brokerClusterIndex := 0; brokerClusterIndex < share.GroupNum; brokerClusterIndex++ {
reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerClusterIndex+1) + "/" + strconv.Itoa(share.GroupNum))
- dep := r.deploymentForMasterBroker(broker, brokerClusterIndex)
+ dep := r.statefulSetForMasterBroker(broker, brokerClusterIndex)
err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found)
if err != nil && errors.IsNotFound(err) {
- reqLogger.Info("Creating a new Master Broker Deployment.", "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
+ reqLogger.Info("Creating a new Master Broker StatefulSet.", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name)
err = r.client.Create(context.TODO(), dep)
for slaveIndex := 1; slaveIndex <= slavePerGroup; slaveIndex++ {
reqLogger.Info("Check Slave Broker of cluster-" + strconv.Itoa(brokerClusterIndex) + " " + strconv.Itoa(slaveIndex) + "/" + strconv.Itoa(slavePerGroup))
- slaveDep := r.deploymentForSlaveBroker(broker, brokerClusterIndex, slaveIndex)
+ slaveDep := r.statefulSetForSlaveBroker(broker, brokerClusterIndex, slaveIndex)
err = r.client.Get(context.TODO(), types.NamespacedName{Name: slaveDep.Name, Namespace: slaveDep.Namespace}, found)
if err != nil && errors.IsNotFound(err) {
- reqLogger.Info("Creating a new Slave Broker Deployment.", "Deployment.Namespace", slaveDep.Namespace, "Deployment.Name", slaveDep.Name)
+ reqLogger.Info("Creating a new Slave Broker StatefulSet.", "StatefulSet.Namespace", slaveDep.Namespace, "StatefulSet.Name", slaveDep.Name)
err = r.client.Create(context.TODO(), slaveDep)
if err != nil {
- reqLogger.Error(err, "Failed to create new Deployment of broker-"+strconv.Itoa(brokerClusterIndex)+"-slave-"+strconv.Itoa(slaveIndex), "Deployment.Namespace", slaveDep.Namespace, "Deployment.Name", slaveDep.Name)
+ reqLogger.Error(err, "Failed to create new StatefulSet of broker-"+strconv.Itoa(brokerClusterIndex)+"-slave-"+strconv.Itoa(slaveIndex), "StatefulSet.Namespace", slaveDep.Namespace, "StatefulSet.Name", slaveDep.Name)
}
} else if err != nil {
- reqLogger.Error(err, "Failed to get broker slave Deployment.")
+ reqLogger.Error(err, "Failed to get broker slave StatefulSet.")
}
}
if err != nil {
- reqLogger.Error(err, "Failed to create new Deployment of "+cons.BrokerClusterPrefix+strconv.Itoa(brokerClusterIndex), "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
+ reqLogger.Error(err, "Failed to create new StatefulSet of "+cons.BrokerClusterPrefix+strconv.Itoa(brokerClusterIndex), "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name)
}
} else if err != nil {
- reqLogger.Error(err, "Failed to get broker master Deployment.")
+ reqLogger.Error(err, "Failed to get broker master StatefulSet.")
}
// The following code will restart all brokers to update NAMESRV_ADDR env
@@ -161,29 +161,29 @@
found.Spec.Template.Spec.Containers[0].Env[0].Value = share.MetaServersStr
err = r.client.Update(context.TODO(), found)
if err != nil {
- reqLogger.Error(err, "Failed to update NAMESRV_ADDR of master broker in "+cons.BrokerClusterPrefix+strconv.Itoa(brokerClusterIndex), "Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
+ reqLogger.Error(err, "Failed to update NAMESRV_ADDR of master broker in "+cons.BrokerClusterPrefix+strconv.Itoa(brokerClusterIndex), "StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name)
}
for slaveIndex := 1; slaveIndex <= slavePerGroup; slaveIndex++ {
reqLogger.Info("Update Slave Broker NAMESRV_ADDR of cluster-" + strconv.Itoa(brokerClusterIndex) + " " + strconv.Itoa(slaveIndex) + "/" + strconv.Itoa(slavePerGroup))
- slaveDep := r.deploymentForSlaveBroker(broker, brokerClusterIndex, slaveIndex)
+ slaveDep := r.statefulSetForSlaveBroker(broker, brokerClusterIndex, slaveIndex)
err = r.client.Get(context.TODO(), types.NamespacedName{Name: slaveDep.Name, Namespace: slaveDep.Namespace}, found)
found.Spec.Template.Spec.Containers[0].Env[0].Value = share.MetaServersStr
err = r.client.Update(context.TODO(), found)
if err != nil {
- reqLogger.Error(err, "Failed to update NAMESRV_ADDR of broker-"+strconv.Itoa(brokerClusterIndex)+"-slave-"+strconv.Itoa(slaveIndex), "Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
+ reqLogger.Error(err, "Failed to update NAMESRV_ADDR of broker-"+strconv.Itoa(brokerClusterIndex)+"-slave-"+strconv.Itoa(slaveIndex), "StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name)
}
}
}*/
}
share.IsMetaServersStrUpdated = false
- // Ensure the deployment size is the same as the spec
+ // Ensure the statefulSet size is the same as the spec
//size := broker.Spec.Size
//if *found.Spec.Replicas != size {
// found.Spec.Replicas = &size
// err = r.client.Update(context.TODO(), found)
// if err != nil {
- // reqLogger.Error(err, "Failed to update Deployment.", "Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
+ // reqLogger.Error(err, "Failed to update StatefulSet.", "StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name)
// return reconcile.Result{}, err
// }
// // Spec updated - return and requeue
@@ -191,7 +191,7 @@
//}
// Update the Broker status with the pod names
- // List the pods for this broker's deployment
+ // List the pods for this broker's statefulSet
//podList := &corev1.PodList{}
//labelSelector := labels.SelectorFromSet(labelsForBroker(broker.Name))
@@ -219,17 +219,17 @@
return reconcile.Result{true, time.Duration(3) * time.Second}, nil
}
-// deploymentForBroker returns a master broker Deployment object
-func (r *ReconcileBroker) deploymentForMasterBroker(m *cachev1alpha1.Broker, brokerClusterIndex int) *appsv1.Deployment {
+// statefulSetForBroker returns a master broker StatefulSet object
+func (r *ReconcileBroker) statefulSetForMasterBroker(m *cachev1alpha1.Broker, brokerClusterIndex int) *appsv1.StatefulSet {
ls := labelsForBroker(m.Name)
var a int32 = 1
var c = &a
- dep := &appsv1.Deployment{
+ dep := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-" + strconv.Itoa(brokerClusterIndex) + "-master",
Namespace: m.Namespace,
},
- Spec: appsv1.DeploymentSpec{
+ Spec: appsv1.StatefulSetSpec{
Replicas: c,
Selector: &metav1.LabelSelector{
MatchLabels: ls,
@@ -279,17 +279,17 @@
}
-// deploymentForBroker returns a slave broker Deployment object
-func (r *ReconcileBroker) deploymentForSlaveBroker(m *cachev1alpha1.Broker, brokerClusterIndex int, slaveIndex int) *appsv1.Deployment {
+// statefulSetForBroker returns a slave broker StatefulSet object
+func (r *ReconcileBroker) statefulSetForSlaveBroker(m *cachev1alpha1.Broker, brokerClusterIndex int, slaveIndex int) *appsv1.StatefulSet {
ls := labelsForBroker(m.Name)
var a int32 = 1
var c = &a
- dep := &appsv1.Deployment{
+ dep := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: m.Name + "-" + strconv.Itoa(brokerClusterIndex) + "-slave-" + strconv.Itoa(slaveIndex),
Namespace: m.Namespace,
},
- Spec: appsv1.DeploymentSpec{
+ Spec: appsv1.StatefulSetSpec{
Replicas: c,
Selector: &metav1.LabelSelector{
MatchLabels: ls,