| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package controllers |
| |
| import ( |
| "context" |
| "fmt" |
| "reflect" |
| "time" |
| |
| "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1" |
| "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes" |
| "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/configmap" |
| "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/deployment" |
| "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/service" |
| reconcile "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/computenode" |
| |
| "github.com/go-logr/logr" |
| appsv1 "k8s.io/api/apps/v1" |
| corev1 "k8s.io/api/core/v1" |
| apierrors "k8s.io/apimachinery/pkg/api/errors" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/runtime" |
| "k8s.io/apimachinery/pkg/types" |
| "k8s.io/apimachinery/pkg/util/intstr" |
| ctrl "sigs.k8s.io/controller-runtime" |
| "sigs.k8s.io/controller-runtime/pkg/client" |
| ) |
| |
| const ( |
| computeNodeControllerName = "compute-node-controller" |
| defaultRequeueTime = 10 * time.Second |
| ) |
| |
| // ComputeNodeReconciler is a controller for the compute node |
| type ComputeNodeReconciler struct { |
| client.Client |
| Scheme *runtime.Scheme |
| Log logr.Logger |
| |
| Builder reconcile.Builder |
| Resources kubernetes.Resources |
| |
| Deployment deployment.Deployment |
| Service service.Service |
| ConfigMap configmap.ConfigMap |
| } |
| |
| // SetupWithManager sets up the controller with the Manager |
| func (r *ComputeNodeReconciler) SetupWithManager(mgr ctrl.Manager) error { |
| return ctrl.NewControllerManagedBy(mgr). |
| For(&v1alpha1.ComputeNode{}). |
| Owns(&appsv1.Deployment{}). |
| Owns(&corev1.Pod{}). |
| Owns(&corev1.Service{}). |
| Owns(&corev1.ConfigMap{}). |
| Complete(r) |
| } |
| |
| // +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=computenodes,verbs=get;list;watch;create;update;patch;delete |
| // +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=computenodes/status,verbs=get;update;patch |
| // +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete |
| // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete |
| // +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete |
| // +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete |
| |
| // Reconcile handles main function of this controller |
| func (r *ComputeNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { |
| logger := r.Log.WithValues(computeNodeControllerName, req.NamespacedName) |
| |
| cn := &v1alpha1.ComputeNode{} |
| if err := r.Get(ctx, req.NamespacedName, cn); err != nil { |
| if apierrors.IsNotFound(err) { |
| return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil |
| } |
| |
| logger.Error(err, "Failed to get the compute node") |
| return ctrl.Result{Requeue: true}, err |
| } |
| |
| if err := r.reconcileStatus(ctx, cn); err != nil { |
| logger.Error(err, "Failed to reconcile status") |
| } |
| |
| errors := []error{} |
| if err := r.reconcileDeployment(ctx, cn); err != nil { |
| logger.Error(err, "Failed to reconcile deployement") |
| errors = append(errors, err) |
| } |
| if err := r.reconcileService(ctx, cn); err != nil { |
| logger.Error(err, "Failed to reconcile service") |
| errors = append(errors, err) |
| } |
| if err := r.reconcileConfigMap(ctx, cn); err != nil { |
| logger.Error(err, "Failed to reconcile configmap") |
| errors = append(errors, err) |
| } |
| |
| if len(errors) != 0 { |
| return ctrl.Result{Requeue: true}, errors[0] |
| } |
| |
| return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil |
| } |
| |
| func (r *ComputeNodeReconciler) reconcileDeployment(ctx context.Context, cn *v1alpha1.ComputeNode) error { |
| deploy, err := r.getDeploymentByNamespacedName(ctx, types.NamespacedName{Namespace: cn.Namespace, Name: cn.Name}) |
| if err != nil { |
| return err |
| } |
| |
| if deploy != nil { |
| return r.updateDeployment(ctx, cn, deploy) |
| } |
| return r.createDeployment(ctx, cn) |
| } |
| |
| func (r *ComputeNodeReconciler) createDeployment(ctx context.Context, cn *v1alpha1.ComputeNode) error { |
| deploy := r.Builder.BuildDeployment(ctx, cn) |
| err := r.Resources.Deployment().Create(ctx, deploy) |
| if err != nil && apierrors.IsAlreadyExists(err) || err == nil { |
| return nil |
| } |
| return err |
| } |
| |
| func (r *ComputeNodeReconciler) updateDeployment(ctx context.Context, cn *v1alpha1.ComputeNode, deploy *appsv1.Deployment) error { |
| exp := r.Builder.BuildDeployment(ctx, cn) |
| exp.ObjectMeta = deploy.ObjectMeta |
| exp.Labels = deploy.Labels |
| exp.Annotations = deploy.Annotations |
| |
| if !reflect.DeepEqual(deploy.Spec, exp.Spec) { |
| return r.Resources.Deployment().Update(ctx, exp) |
| } |
| return nil |
| } |
| |
| func (r *ComputeNodeReconciler) getDeploymentByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*appsv1.Deployment, error) { |
| dp, err := r.Resources.Deployment().GetByNamespacedName(ctx, namespacedName) |
| if err != nil { |
| return nil, err |
| } |
| return dp, nil |
| } |
| |
| func (r *ComputeNodeReconciler) reconcileService(ctx context.Context, cn *v1alpha1.ComputeNode) error { |
| svc, err := r.getServiceByNamespacedName(ctx, types.NamespacedName{Namespace: cn.Namespace, Name: cn.Name}) |
| if err != nil { |
| return err |
| } |
| if svc != nil { |
| return r.updateService(ctx, cn, svc) |
| } |
| return r.createService(ctx, cn) |
| } |
| |
| func (r *ComputeNodeReconciler) createService(ctx context.Context, cn *v1alpha1.ComputeNode) error { |
| svc := r.Builder.BuildService(ctx, cn) |
| err := r.Resources.Service().Create(ctx, svc) |
| if err != nil && apierrors.IsAlreadyExists(err) || err == nil { |
| return nil |
| } |
| return err |
| } |
| |
| func (r *ComputeNodeReconciler) updateComputeNodePortBindings(ctx context.Context, cn *v1alpha1.ComputeNode) error { |
| if rt, err := r.getRuntimeComputeNode(ctx, types.NamespacedName{ |
| Namespace: cn.Namespace, |
| Name: cn.Name, |
| }); err == nil { |
| rt.Spec.PortBindings = cn.Spec.PortBindings |
| if err := r.Update(ctx, rt); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| func (r *ComputeNodeReconciler) updateService(ctx context.Context, cn *v1alpha1.ComputeNode, s *corev1.Service) error { |
| pbs := []v1alpha1.PortBinding{} |
| copy(cn.Spec.PortBindings, pbs) |
| switch cn.Spec.ServiceType { |
| case corev1.ServiceTypeClusterIP: |
| updateServiceClusterIP(cn.Spec.PortBindings) |
| if !reflect.DeepEqual(cn.Spec.PortBindings, pbs) { |
| return r.updateComputeNodePortBindings(ctx, cn) |
| } |
| case corev1.ServiceTypeExternalName: |
| fallthrough |
| case corev1.ServiceTypeLoadBalancer: |
| fallthrough |
| case corev1.ServiceTypeNodePort: |
| updateServiceNodePort(cn.Spec.PortBindings, s.Spec.Ports) |
| if !reflect.DeepEqual(cn.Spec.PortBindings, pbs) { |
| return r.updateComputeNodePortBindings(ctx, cn) |
| } |
| } |
| |
| exp := r.Builder.BuildService(ctx, cn) |
| exp.ObjectMeta = s.ObjectMeta |
| exp.Spec.ClusterIP = s.Spec.ClusterIP |
| exp.Spec.ClusterIPs = s.Spec.ClusterIPs |
| |
| if cn.Spec.ServiceType == corev1.ServiceTypeNodePort { |
| exp.Spec.Ports = updateNodePorts(cn.Spec.PortBindings, s.Spec.Ports) |
| } |
| |
| if !reflect.DeepEqual(exp.Spec, s.Spec) { |
| return r.Update(ctx, exp) |
| } |
| return nil |
| } |
| |
| func updateServiceNodePort(pbs []v1alpha1.PortBinding, ports []corev1.ServicePort) { |
| for idx := range ports { |
| for i := range pbs { |
| if ports[idx].Name == pbs[i].Name { |
| if pbs[i].NodePort == 0 { |
| pbs[i].NodePort = ports[idx].NodePort |
| } |
| break |
| } |
| } |
| } |
| } |
| |
| func updateServiceClusterIP(portBindings []v1alpha1.PortBinding) { |
| for idx := range portBindings { |
| if portBindings[idx].NodePort != 0 { |
| portBindings[idx].NodePort = 0 |
| break |
| } |
| } |
| } |
| |
| func updateNodePorts(portbindings []v1alpha1.PortBinding, svcports []corev1.ServicePort) []corev1.ServicePort { |
| ports := []corev1.ServicePort{} |
| for pb := range portbindings { |
| for sp := range svcports { |
| if portbindings[pb].Name == svcports[sp].Name { |
| port := corev1.ServicePort{ |
| Name: portbindings[pb].Name, |
| TargetPort: intstr.FromInt(int(portbindings[pb].ContainerPort)), |
| Port: portbindings[pb].ServicePort, |
| Protocol: portbindings[pb].Protocol, |
| } |
| if svcports[sp].NodePort != 0 { |
| port.NodePort = svcports[sp].NodePort |
| } |
| ports = append(ports, port) |
| } |
| } |
| } |
| return ports |
| } |
| |
| func (r *ComputeNodeReconciler) getServiceByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*corev1.Service, error) { |
| svc, err := r.Resources.Service().GetByNamespacedName(ctx, namespacedName) |
| if err != nil { |
| return nil, err |
| } |
| return svc, nil |
| } |
| |
| func (r *ComputeNodeReconciler) createConfigMap(ctx context.Context, cn *v1alpha1.ComputeNode) error { |
| cm := r.Builder.BuildConfigMap(ctx, cn) |
| err := r.Resources.ConfigMap().Create(ctx, cm) |
| if err != nil && apierrors.IsAlreadyExists(err) || err == nil { |
| return nil |
| } |
| return err |
| } |
| |
| func (r *ComputeNodeReconciler) updateConfigMap(ctx context.Context, cn *v1alpha1.ComputeNode, cm *corev1.ConfigMap) error { |
| exp := r.Builder.BuildConfigMap(ctx, cn) |
| exp.ObjectMeta = cm.ObjectMeta |
| exp.Labels = cm.Labels |
| exp.Annotations = cm.Annotations |
| if !reflect.DeepEqual(cm.Data, exp.Data) { |
| return r.Resources.ConfigMap().Update(ctx, exp) |
| } |
| return nil |
| } |
| |
| func (r *ComputeNodeReconciler) getConfigMapByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*corev1.ConfigMap, error) { |
| cm, err := r.Resources.ConfigMap().GetByNamespacedName(ctx, namespacedName) |
| if err != nil { |
| return nil, err |
| } |
| return cm, nil |
| } |
| |
| func (r *ComputeNodeReconciler) reconcileConfigMap(ctx context.Context, cn *v1alpha1.ComputeNode) error { |
| cm, err := r.getConfigMapByNamespacedName(ctx, types.NamespacedName{Namespace: cn.Namespace, Name: cn.Name}) |
| if err != nil { |
| return err |
| } |
| if cm != nil { |
| return r.updateConfigMap(ctx, cn, cm) |
| } |
| return r.createConfigMap(ctx, cn) |
| } |
| |
| func (r *ComputeNodeReconciler) reconcileStatus(ctx context.Context, cn *v1alpha1.ComputeNode) error { |
| selector, err := metav1.LabelSelectorAsSelector(cn.Spec.Selector) |
| if err != nil { |
| err := fmt.Errorf("error retrieving ComputeNode labels") |
| return err |
| } |
| |
| podlist := &corev1.PodList{} |
| if err := r.List(ctx, podlist, client.InNamespace(cn.Namespace), client.MatchingLabels(cn.Spec.Selector.MatchLabels)); err != nil { |
| return err |
| } |
| |
| service := &corev1.Service{} |
| if err := r.Get(ctx, types.NamespacedName{ |
| Namespace: cn.Namespace, |
| Name: cn.Name, |
| }, service); err != nil { |
| if !apierrors.IsNotFound(err) { |
| return err |
| } |
| } |
| |
| status := reconcileComputeNodeStatus(podlist, service, cn) |
| rt, err := r.getRuntimeComputeNode(ctx, types.NamespacedName{ |
| Namespace: cn.Namespace, |
| Name: cn.Name, |
| }) |
| if err != nil { |
| return err |
| } |
| status.Selector = selector.String() |
| rt.Status = *status |
| |
| return r.Status().Update(ctx, rt) |
| } |
| |
| func getReadyProxyInstances(podlist *corev1.PodList) int32 { |
| var cnt int32 |
| |
| findRunningPod := func(pod *corev1.Pod) { |
| if pod.Status.Phase != corev1.PodRunning { |
| return |
| } |
| |
| if isTrueReadyPod(pod) { |
| for j := range pod.Status.ContainerStatuses { |
| if pod.Status.ContainerStatuses[j].Name == "shardingsphere-proxy" && pod.Status.ContainerStatuses[j].Ready { |
| cnt++ |
| } |
| } |
| } |
| } |
| |
| for idx := range podlist.Items { |
| findRunningPod(&podlist.Items[idx]) |
| } |
| return cnt |
| } |
| |
| func isTrueReadyPod(pod *corev1.Pod) bool { |
| for i := range pod.Status.Conditions { |
| if pod.Status.Conditions[i].Type == corev1.PodReady && pod.Status.Conditions[i].Status == corev1.ConditionTrue { |
| return true |
| } |
| } |
| return false |
| } |
| |
| func updateComputeNodeStatusCondition(conditions []v1alpha1.ComputeNodeCondition, conds []v1alpha1.ComputeNodeCondition) []v1alpha1.ComputeNodeCondition { |
| for idx := range conds { |
| var found bool |
| for i := range conditions { |
| conditions[i].LastUpdateTime = conds[idx].LastUpdateTime |
| if conditions[i].Type == conds[idx].Type { |
| found = true |
| conditions[i].Type = conds[idx].Type |
| conditions[i].Status = conds[idx].Status |
| conditions[i].Message = conds[idx].Message |
| conditions[i].Reason = conds[idx].Reason |
| } else if conds[idx].Type == v1alpha1.ComputeNodeConditionUnknown || conditions[i].Type == v1alpha1.ComputeNodeConditionUnknown { |
| conditions[i].Status = v1alpha1.ConditionStatusFalse |
| } else { |
| continue |
| } |
| } |
| |
| // check current conditions |
| if len(conditions) == 0 || !found { |
| conditions = append(conditions, conds[idx]) |
| } |
| } |
| |
| return conditions |
| } |
| |
| func reconcileComputeNodeStatus(podlist *corev1.PodList, svc *corev1.Service, cn *v1alpha1.ComputeNode) *v1alpha1.ComputeNodeStatus { |
| conds := reconcile.GetConditionFromPods(podlist) |
| |
| cn.Status.Conditions = updateComputeNodeStatusCondition(cn.Status.Conditions, conds) |
| |
| ready := getReadyProxyInstances(podlist) |
| cn.Status.Ready = fmt.Sprintf("%d/%d", ready, len(podlist.Items)) |
| cn.Status.Replicas = int32(len(podlist.Items)) |
| |
| if ready > 0 { |
| cn.Status.Phase = v1alpha1.ComputeNodeStatusReady |
| } else { |
| cn.Status.Phase = v1alpha1.ComputeNodeStatusNotReady |
| } |
| |
| cn.Status.LoadBalancer.ClusterIP = svc.Spec.ClusterIP |
| cn.Status.LoadBalancer.Ingress = svc.Status.LoadBalancer.Ingress |
| return &cn.Status |
| } |
| |
| func (r *ComputeNodeReconciler) getRuntimeComputeNode(ctx context.Context, namespacedName types.NamespacedName) (*v1alpha1.ComputeNode, error) { |
| rt := &v1alpha1.ComputeNode{} |
| err := r.Get(ctx, namespacedName, rt) |
| return rt, err |
| } |