| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package controller |
| |
| import ( |
| "context" |
| "errors" |
| dv1 "github.com/apache/doris-operator/api/disaggregated/v1" |
| "github.com/apache/doris-operator/pkg/common/utils/hash" |
| sc "github.com/apache/doris-operator/pkg/controller/sub_controller" |
| dcgs "github.com/apache/doris-operator/pkg/controller/sub_controller/disaggregated_cluster/computegroups" |
| dfe "github.com/apache/doris-operator/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe" |
| "github.com/apache/doris-operator/pkg/controller/sub_controller/disaggregated_cluster/metaservice" |
| appv1 "k8s.io/api/apps/v1" |
| corev1 "k8s.io/api/core/v1" |
| apierrors "k8s.io/apimachinery/pkg/api/errors" |
| "k8s.io/apimachinery/pkg/runtime" |
| "k8s.io/apimachinery/pkg/types" |
| "k8s.io/client-go/tools/record" |
| "k8s.io/client-go/util/retry" |
| "k8s.io/klog/v2" |
| "os" |
| ctrl "sigs.k8s.io/controller-runtime" |
| controller_builder "sigs.k8s.io/controller-runtime/pkg/builder" |
| "sigs.k8s.io/controller-runtime/pkg/client" |
| "sigs.k8s.io/controller-runtime/pkg/event" |
| "sigs.k8s.io/controller-runtime/pkg/handler" |
| "sigs.k8s.io/controller-runtime/pkg/predicate" |
| "sigs.k8s.io/controller-runtime/pkg/reconcile" |
| "sigs.k8s.io/controller-runtime/pkg/source" |
| "time" |
| ) |
| |
| var ( |
| _ reconcile.Reconciler = &DisaggregatedClusterReconciler{} |
| _ Controller = &DisaggregatedClusterReconciler{} |
| ) |
| |
| var ( |
| disaggregatedClusterController = "disaggregatedClusterController" |
| ) |
| |
| type DisaggregatedClusterReconciler struct { |
| client.Client |
| Recorder record.EventRecorder |
| Scheme *runtime.Scheme |
| Scs map[string]sc.DisaggregatedSubController |
| //record configmap response instance. key: configMap namespacedName, value: DorisDisaggregatedCluster namespacedName |
| //wcms map[string]string |
| } |
| |
| func (dc *DisaggregatedClusterReconciler) Init(mgr ctrl.Manager, options *Options) { |
| //wcms := make(map[string]string) |
| scs := make(map[string]sc.DisaggregatedSubController) |
| msc := metaservice.New(mgr) |
| scs[msc.GetControllerName()] = msc |
| |
| dfec := dfe.New(mgr) |
| scs[dfec.GetControllerName()] = dfec |
| dccsc := dcgs.New(mgr) |
| scs[dccsc.GetControllerName()] = dccsc |
| |
| if err := (&DisaggregatedClusterReconciler{ |
| Client: mgr.GetClient(), |
| Recorder: mgr.GetEventRecorderFor(disaggregatedClusterController), |
| Scs: scs, |
| //wcms: wcms, |
| }).SetupWithManager(mgr); err != nil { |
| klog.Error(err, "unable to create controller ", "disaggregatedClusterReconciler") |
| os.Exit(1) |
| } |
| |
| if options.EnableWebHook { |
| if err := (&dv1.DorisDisaggregatedCluster{}).SetupWebhookWithManager(mgr); err != nil { |
| klog.Error(err, " unable to create unnamedwatches ", " controller ", " DorisDisaggregatedCluster ") |
| os.Exit(1) |
| } |
| } |
| } |
| |
| func (dc *DisaggregatedClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { |
| builder := dc.resourceBuilder(ctrl.NewControllerManagedBy(mgr)) |
| builder = dc.watchPodBuilder(builder) |
| //builder = dc.watchConfigMapBuilder(builder) |
| return builder.Complete(dc) |
| } |
| |
| func (dc *DisaggregatedClusterReconciler) watchPodBuilder(builder *ctrl.Builder) *ctrl.Builder { |
| mapFn := handler.EnqueueRequestsFromMapFunc( |
| func(a client.Object) []reconcile.Request { |
| labels := a.GetLabels() |
| dorisName := labels[dv1.DorisDisaggregatedClusterName] |
| if dorisName != "" { |
| return []reconcile.Request{ |
| {NamespacedName: types.NamespacedName{ |
| Name: dorisName, |
| Namespace: a.GetNamespace(), |
| }}, |
| } |
| } |
| |
| return nil |
| }) |
| |
| p := predicate.Funcs{ |
| CreateFunc: func(e event.CreateEvent) bool { |
| if _, ok := e.Object.GetLabels()[dv1.DorisDisaggregatedClusterName]; !ok { |
| return false |
| } |
| |
| return true |
| }, |
| UpdateFunc: func(u event.UpdateEvent) bool { |
| if _, ok := u.ObjectOld.GetLabels()[dv1.DorisDisaggregatedClusterName]; !ok { |
| return false |
| } |
| |
| return u.ObjectOld != u.ObjectNew |
| }, |
| DeleteFunc: func(d event.DeleteEvent) bool { |
| if _, ok := d.Object.GetLabels()[dv1.DorisDisaggregatedClusterName]; !ok { |
| return false |
| } |
| |
| return true |
| }, |
| } |
| |
| return builder.Watches(&source.Kind{Type: &corev1.Pod{}}, |
| mapFn, controller_builder.WithPredicates(p)) |
| } |
| |
| //func (dc *DisaggregatedClusterReconciler) watchConfigMapBuilder(builder *ctrl.Builder) *ctrl.Builder { |
| // mapFn := handler.EnqueueRequestsFromMapFunc( |
| // func(a client.Object) []reconcile.Request { |
| // namespace := a.GetNamespace() |
| // name := a.GetName() |
| // cmnn := types.NamespacedName{Namespace: namespace, Name: name} |
| // cmnnStr := cmnn.String() |
| // if ddc, ok := dc.wcms[cmnnStr]; ok { |
| // nna := strings.Split(ddc, "/") |
| // // not run only for code standard |
| // if len(nna) != 2 { |
| // return nil |
| // } |
| // |
| // return []reconcile.Request{{NamespacedName: types.NamespacedName{ |
| // Namespace: nna[0], |
| // Name: nna[1], |
| // }}} |
| // } |
| // return nil |
| // }) |
| // |
| // p := predicate.Funcs{ |
| // UpdateFunc: func(u event.UpdateEvent) bool { |
| // ns := u.ObjectNew.GetNamespace() |
| // name := u.ObjectNew.GetName() |
| // nsn := ns + "/" + name |
| // _, ok := dc.wcms[nsn] |
| // return ok |
| // }, |
| // } |
| // |
| // return builder.Watches(&source.Kind{Type: &corev1.ConfigMap{}}, |
| // mapFn, controller_builder.WithPredicates(p)) |
| //} |
| |
| func (dc *DisaggregatedClusterReconciler) resourceBuilder(builder *ctrl.Builder) *ctrl.Builder { |
| return builder.For(&dv1.DorisDisaggregatedCluster{}). |
| Owns(&appv1.StatefulSet{}). |
| Owns(&corev1.Service{}) |
| } |
| |
| // Reconcile steps: |
| // 1. check and register instance info. info register in memory. periodical sync. |
| // 2. sync resource. |
| // 3. clear need delete resource. |
| // 4. display new status(eorganize status, update cr or status) |
| func (dc *DisaggregatedClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { |
| var ddc dv1.DorisDisaggregatedCluster |
| err := dc.Get(ctx, req.NamespacedName, &ddc) |
| if apierrors.IsNotFound(err) { |
| klog.Warningf("disaggreatedClusterReconciler not find resource DorisDisaggregatedCluster namespaceName %s", req.NamespacedName) |
| return ctrl.Result{}, nil |
| } |
| hv := hash.HashObject(ddc.Spec) |
| |
| var res ctrl.Result |
| ////TODO: deprecated. |
| //cmnn := types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Spec.InstanceConfigMap} |
| //ddcnn := types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Name} |
| //cmnnStr := cmnn.String() |
| //ddcnnStr := ddcnn.String() |
| //if _, ok := dc.wcms[cmnnStr]; !ok { |
| // dc.wcms[cmnnStr] = ddcnnStr |
| //} |
| |
| //sync resource. |
| //recall all errors |
| var msg string |
| reconRes, reconErr := dc.reconcileSub(ctx, &ddc) |
| if reconErr != nil { |
| msg = msg + reconErr.Error() |
| res = reconRes |
| } |
| |
| // clear unused resources. |
| clearRes, clearErr := dc.clearUnusedResources(ctx, &ddc) |
| if clearErr != nil { |
| msg = msg + reconErr.Error() |
| res = clearRes |
| } |
| |
| //display new status. |
| disRes, disErr := func() (ctrl.Result, error) { |
| //reorganize status. |
| if res, err = dc.reorganizeStatus(&ddc); err != nil { |
| return res, err |
| } |
| |
| //update cr or status |
| if res, err = dc.updateObjectORStatus(ctx, &ddc, hv); err != nil { |
| return res, err |
| } |
| |
| return ctrl.Result{}, nil |
| }() |
| |
| if disErr != nil { |
| msg = msg + disErr.Error() |
| res = disRes |
| } |
| |
| return res, err |
| } |
| |
| func (dc *DisaggregatedClusterReconciler) clearUnusedResources(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) { |
| for _, subC := range dc.Scs { |
| subC.ClearResources(ctx, ddc) |
| } |
| |
| return ctrl.Result{}, nil |
| } |
| |
| func (dc *DisaggregatedClusterReconciler) reorganizeStatus(ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) { |
| for _, sc := range dc.Scs { |
| //update component status. |
| if err := sc.UpdateComponentStatus(ddc); err != nil { |
| klog.Errorf("DorisClusterReconciler reconcile update component %s status failed.err=%s\n", sc.GetControllerName(), err.Error()) |
| return requeueIfError(err) |
| } |
| } |
| |
| ddc.Status.ClusterHealth.Health = dv1.Green |
| if ddc.Status.FEStatus.AvailableStatus != dv1.Available || ddc.Status.ClusterHealth.CGAvailableCount <= (ddc.Status.ClusterHealth.CGCount/2) { |
| ddc.Status.ClusterHealth.Health = dv1.Red |
| } else if ddc.Status.FEStatus.Phase != dv1.Ready || ddc.Status.ClusterHealth.CGAvailableCount < ddc.Status.ClusterHealth.CGCount { |
| ddc.Status.ClusterHealth.Health = dv1.Yellow |
| } |
| return ctrl.Result{}, nil |
| } |
| |
| func (dc *DisaggregatedClusterReconciler) reconcileSub(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) { |
| // recall all sub for check error. |
| errs := []error{} |
| for _, subC := range dc.Scs { |
| if err := subC.Sync(ctx, ddc); err != nil { |
| klog.Errorf("disaggreatedClusterReconciler sub reconciler %s sync err=%s.", subC.GetControllerName(), err.Error()) |
| errs = append(errs, err) |
| } |
| } |
| |
| if len(errs) != 0 { |
| msg := "" |
| for _, err := range errs { |
| msg += err.Error() |
| } |
| return ctrl.Result{}, errors.New(msg) |
| } |
| return ctrl.Result{}, nil |
| } |
| |
| // when spec revert by operator should update cr or directly update status. |
| func (dc *DisaggregatedClusterReconciler) updateObjectORStatus(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster, preHv string) (ctrl.Result, error) { |
| postHv := hash.HashObject(ddc.Spec) |
| deepCopyDDC := ddc.DeepCopy() |
| if preHv != postHv { |
| var eddc dv1.DorisDisaggregatedCluster |
| if err := dc.Get(ctx, types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Name}, &eddc); err == nil || !apierrors.IsNotFound(err) { |
| if eddc.ResourceVersion != "" { |
| ddc.ResourceVersion = eddc.ResourceVersion |
| } |
| } |
| if err := dc.Update(ctx, ddc); err != nil { |
| klog.Errorf("disaggreatedClusterReconciler update DorisDisaggregatedCluster namespace %s name %s failed, err=%s", ddc.Namespace, ddc.Name, err.Error()) |
| //return ctrl.Result{}, err |
| } |
| } |
| return dc.updateDorisDisaggregatedClusterStatus(ctx, deepCopyDDC) |
| } |
| |
| func (dc *DisaggregatedClusterReconciler) updateDorisDisaggregatedClusterStatus(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) { |
| var eddc dv1.DorisDisaggregatedCluster |
| if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { |
| if err := dc.Get(ctx, types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Name}, &eddc); err != nil { |
| return err |
| } |
| ddc.Status.DeepCopyInto(&eddc.Status) |
| return dc.Status().Update(ctx, &eddc) |
| }); err != nil { |
| klog.Errorf("updateDorisDisaggregatedClusterStatus update status failed err: %s", err.Error()) |
| } |
| |
| // if the status is not equal before reconcile and now the status is not available we should requeue. |
| if !disAggregatedInconsistentStatus(&ddc.Status, &eddc) { |
| return ctrl.Result{RequeueAfter: 5 * time.Second}, nil |
| } |
| return ctrl.Result{}, nil |
| } |