| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package controllers |
| |
| import ( |
| "context" |
| "fmt" |
| "reflect" |
| "strings" |
| |
| "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1" |
| cloudnativepg "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/cloudnative-pg" |
| "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/service" |
| "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/storagenode/aws" |
| "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/shardingsphere" |
| |
| cnpg "github.com/cloudnative-pg/cloudnative-pg/api/v1" |
| cnpgutils "github.com/cloudnative-pg/cloudnative-pg/pkg/utils" |
| dbmeshaws "github.com/database-mesh/golang-sdk/aws" |
| dbmeshawsrds "github.com/database-mesh/golang-sdk/aws/client/rds" |
| "github.com/go-logr/logr" |
| corev1 "k8s.io/api/core/v1" |
| apierrors "k8s.io/apimachinery/pkg/api/errors" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/runtime" |
| "k8s.io/apimachinery/pkg/types" |
| "k8s.io/client-go/tools/record" |
| "k8s.io/utils/strings/slices" |
| ctrl "sigs.k8s.io/controller-runtime" |
| "sigs.k8s.io/controller-runtime/pkg/client" |
| "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" |
| ) |
| |
| const ( |
| StorageNodeControllerName = "storage-node-controller" |
| FinalizerName = "shardingsphere.apache.org/finalizer" |
| |
| AnnotationKeyRegisterStorageUnitEnabled = "shardingsphere.apache.org/register-storage-unit-enabled" |
| AnnotationKeyComputeNodeName = "shardingsphere.apache.org/compute-node-name" |
| AnnotationKeyLogicDatabaseName = "shardingsphere.apache.org/logic-database-name" |
| |
| ShardingSphereProtocolType = "proxy-frontend-database-protocol-type" |
| ) |
| |
| // StorageNodeReconciler is a controller for storage nodes |
| type StorageNodeReconciler struct { |
| client.Client |
| Scheme *runtime.Scheme |
| Log logr.Logger |
| Recorder record.EventRecorder |
| Service service.Service |
| |
| AwsRegion string |
| AwsAccessKeyID string |
| AwsSecretAccessKey string |
| AwsSessions dbmeshaws.Sessions |
| |
| CNPG cloudnativepg.CloudNativePG |
| } |
| |
| // +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=storagenodes,verbs=get;list;watch;create;update;patch;delete |
| // +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=storagenodes/status,verbs=get;update;patch |
| // +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=storagenodes/finalizers,verbs=update |
| // +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=storageproviders,verbs=get;list;watch |
| // +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete |
| // +kubebuilder:rbac:groups="",resources=event,verbs=create;patch |
| |
| // Reconcile handles main function of this controller |
| // nolint:gocognit |
| func (r *StorageNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { |
| r.Log.Info(fmt.Sprintf("Reconciling StorageNode %s", req.NamespacedName)) |
| |
| // get storage node |
| node := &v1alpha1.StorageNode{} |
| if err := r.Get(ctx, req.NamespacedName, node); err != nil { |
| return ctrl.Result{}, client.IgnoreNotFound(err) |
| } |
| |
| // Get storageProvider with storagenode.Spec.StorageProviderName |
| storageProvider, err := r.getStorageProvider(ctx, node) |
| if err != nil { |
| r.Log.Error(err, fmt.Sprintf("unable to fetch storageProvider %s", node.Spec.StorageProviderName)) |
| return ctrl.Result{Requeue: true}, err |
| } |
| |
| // finalize storage node |
| if node.ObjectMeta.DeletionTimestamp.IsZero() { |
| // The object is not being deleted, so if it does not have our finalizer, |
| // then lets add the finalizer and update the object. This is equivalent to registering our finalizer. |
| if !slices.Contains(node.ObjectMeta.Finalizers, FinalizerName) { |
| node.ObjectMeta.Finalizers = append(node.ObjectMeta.Finalizers, FinalizerName) |
| if err := r.Update(ctx, node); err != nil { |
| return ctrl.Result{}, err |
| } |
| } |
| } else if slices.Contains(node.ObjectMeta.Finalizers, FinalizerName) { |
| return r.finalize(ctx, node, storageProvider) |
| } |
| |
| // reconcile storage node |
| return r.reconcile(ctx, storageProvider, node) |
| } |
| |
| func (r *StorageNodeReconciler) finalizeCloudNativePG(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) (ctrl.Result, error) { |
| namespacedName := types.NamespacedName{ |
| Name: node.Name, |
| Namespace: node.Namespace, |
| } |
| |
| cluster, err := r.CNPG.GetClusterByNamespacedName(ctx, namespacedName) |
| if err != nil { |
| return ctrl.Result{Requeue: true}, err |
| } |
| |
| if cluster != nil { |
| if err := r.CNPG.Delete(ctx, cluster); err != nil { |
| return ctrl.Result{Requeue: true}, err |
| } |
| } |
| |
| controllerutil.RemoveFinalizer(node, ChaosFinalizerName) |
| if err := r.Update(ctx, node); err != nil { |
| return ctrl.Result{}, err |
| } |
| |
| return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil |
| } |
| |
| func (r *StorageNodeReconciler) finalize(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) (ctrl.Result, error) { |
| var err error |
| var oldStatus = node.Status.DeepCopy() |
| |
| switch node.Status.Phase { |
| case v1alpha1.StorageNodePhaseReady, v1alpha1.StorageNodePhaseNotReady: |
| // set storage node status to deleting |
| node.Status.Phase = v1alpha1.StorageNodePhaseDeleting |
| case v1alpha1.StorageNodePhaseDeleting: |
| break |
| case v1alpha1.StorageNodePhaseDeleteComplete: |
| node.ObjectMeta.Finalizers = slices.Filter([]string{}, node.ObjectMeta.Finalizers, func(f string) bool { |
| return f != FinalizerName |
| }) |
| if err = r.Update(ctx, node); err != nil { |
| r.Log.Error(err, "failed to remove finalizer") |
| } |
| return ctrl.Result{}, nil |
| } |
| |
| if storageProvider.Spec.Provisioner == v1alpha1.ProvisionerCloudNativePG { |
| return r.finalizeCloudNativePG(ctx, node, storageProvider) |
| } |
| |
| // Try to unregister storage unit in shardingsphere. |
| if err = r.unregisterStorageUnit(ctx, node); err != nil { |
| r.Log.Error(err, "failed to delete storage unit") |
| return ctrl.Result{RequeueAfter: defaultRequeueTime}, err |
| } |
| |
| if err = r.deleteDatabaseCluster(ctx, node, storageProvider); err != nil { |
| r.Log.Error(err, "failed to delete database cluster") |
| return ctrl.Result{RequeueAfter: defaultRequeueTime}, err |
| } |
| |
| desiredState := computeDesiredState(node.Status) |
| |
| if !reflect.DeepEqual(oldStatus, desiredState) { |
| node.Status = desiredState |
| err := r.Status().Update(ctx, node) |
| if err != nil { |
| r.Log.Error(err, fmt.Sprintf("unable to update StorageNode %s/%s", node.GetNamespace(), node.GetName())) |
| return ctrl.Result{Requeue: true}, err |
| } |
| } |
| |
| return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil |
| } |
| |
| func (r *StorageNodeReconciler) reconcile(ctx context.Context, storageProvider *v1alpha1.StorageProvider, node *v1alpha1.StorageNode) (ctrl.Result, error) { |
| var err error |
| var oldStatus = node.Status.DeepCopy() |
| |
| // reconcile storage node with storageProvider |
| switch storageProvider.Spec.Provisioner { |
| case v1alpha1.ProvisionerAWSRDSInstance: |
| if err := r.reconcileAwsRdsInstance(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil { |
| r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Instance %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error())) |
| return ctrl.Result{RequeueAfter: defaultRequeueTime}, err |
| } |
| case v1alpha1.ProvisionerAWSRDSCluster: |
| if err := r.reconcileAwsRDSCluster(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil { |
| r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Cluster %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error())) |
| return ctrl.Result{RequeueAfter: defaultRequeueTime}, err |
| } |
| case v1alpha1.ProvisionerAWSAurora: |
| if err := r.reconcileAwsAurora(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil { |
| r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS Aurora %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error())) |
| return ctrl.Result{RequeueAfter: defaultRequeueTime}, err |
| } |
| case v1alpha1.ProvisionerCloudNativePG: |
| if err := r.reconcileCloudNativePG(ctx, node, storageProvider); err != nil { |
| r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile CloudNative PG %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error())) |
| return ctrl.Result{RequeueAfter: defaultRequeueTime}, err |
| } |
| default: |
| r.Recorder.Event(node, corev1.EventTypeWarning, "UnsupportedDatabaseProvisioner", fmt.Sprintf("unsupported database provisioner %s", storageProvider.Spec.Provisioner)) |
| return ctrl.Result{RequeueAfter: defaultRequeueTime}, err |
| } |
| |
| // register storage unit if needed. |
| if err := r.registerStorageUnit(ctx, node, storageProvider); err != nil { |
| r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterStorageUnitFailed", "unable to register storage unit %s/%s", node.GetNamespace(), node.GetName()) |
| return ctrl.Result{Requeue: true}, err |
| } |
| |
| desiredState := computeDesiredState(node.Status) |
| |
| if !reflect.DeepEqual(oldStatus, desiredState) { |
| node.Status = desiredState |
| err := r.Status().Update(ctx, node) |
| if err != nil { |
| r.Log.Error(err, fmt.Sprintf("unable to update StorageNode %s/%s", node.GetNamespace(), node.GetName())) |
| return ctrl.Result{Requeue: true}, err |
| } |
| } |
| |
| return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil |
| } |
| |
| func (r *StorageNodeReconciler) getStorageProvider(ctx context.Context, node *v1alpha1.StorageNode) (storageProvider *v1alpha1.StorageProvider, err error) { |
| if node.Spec.StorageProviderName == "" { |
| r.Recorder.Event(node, corev1.EventTypeWarning, "storageProviderNameIsNil", "storageProviderName is nil") |
| return nil, fmt.Errorf("storageProviderName is nil") |
| } |
| |
| storageProvider = &v1alpha1.StorageProvider{} |
| |
| if err := r.Get(ctx, client.ObjectKey{Name: node.Spec.StorageProviderName}, storageProvider); err != nil { |
| r.Log.Error(err, fmt.Sprintf("unable to fetch storageProvider %s", node.Spec.StorageProviderName)) |
| r.Recorder.Event(node, corev1.EventTypeWarning, "storageProviderNotFound", fmt.Sprintf("storageProvider %s not found", node.Spec.StorageProviderName)) |
| return nil, err |
| } |
| |
| // check provisioner |
| // aws-like provisioner need aws rds client |
| if storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSRDSInstance || |
| storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSAurora || |
| storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSRDSCluster { |
| if r.AwsRegion == "" || r.AwsAccessKeyID == "" || r.AwsSecretAccessKey == "" { |
| r.Recorder.Eventf(node, corev1.EventTypeWarning, "awsCredentialsNotSet", "aws credentials not set") |
| return nil, fmt.Errorf("aws credentials not set") |
| } |
| } |
| |
| return storageProvider, nil |
| } |
| |
| // nolint:gocritic |
| func computeDesiredState(status v1alpha1.StorageNodeStatus) v1alpha1.StorageNodeStatus { |
| // Initialize a new status object based on the current state |
| desiredState := status |
| clusterStatus := status.Cluster.Status |
| |
| if status.Phase == v1alpha1.StorageNodePhaseDeleting { |
| // If the storage node is being deleted, check if all instances are deleted. |
| if clusterStatus == "" && len(status.Instances) == 0 { |
| desiredState.Phase = v1alpha1.StorageNodePhaseDeleteComplete |
| } |
| } else { |
| // If the storage node is not being deleted, check if all instances are ready. |
| if (clusterStatus == "" || clusterStatus == string(dbmeshawsrds.DBClusterStatusAvailable)) && allInstancesReady(status.Instances) { |
| desiredState.Phase = v1alpha1.StorageNodePhaseReady |
| } else { |
| desiredState.Phase = v1alpha1.StorageNodePhaseNotReady |
| } |
| } |
| |
| desiredState.Conditions = computeNewConditions(desiredState, status, clusterStatus) |
| |
| return desiredState |
| } |
| |
| // nolint:gocritic |
| func computeNewConditions(desiredState, status v1alpha1.StorageNodeStatus, clusterStatus string) v1alpha1.StorageNodeConditions { |
| newSNConditions := status.Conditions |
| |
| // Update the cluster ready condition if the cluster status is not empty |
| if clusterStatus != "" { |
| if clusterStatus == string(dbmeshawsrds.DBClusterStatusAvailable) { |
| newSNConditions.UpsertCondition(&v1alpha1.StorageNodeCondition{ |
| Type: v1alpha1.StorageNodeConditionTypeClusterReady, |
| Status: corev1.ConditionTrue, |
| LastUpdateTime: metav1.Now(), |
| Reason: "Cluster is ready", |
| }) |
| } else { |
| newSNConditions.UpsertCondition(&v1alpha1.StorageNodeCondition{ |
| Type: v1alpha1.StorageNodeConditionTypeClusterReady, |
| Status: corev1.ConditionFalse, |
| LastUpdateTime: metav1.Now(), |
| Reason: "Cluster is not ready", |
| }) |
| } |
| } else { |
| newSNConditions.RemoveCondition(v1alpha1.StorageNodeConditionTypeClusterReady) |
| } |
| |
| // Update the available condition based on the phase |
| if desiredState.Phase == v1alpha1.StorageNodePhaseReady { |
| newSNConditions.UpsertCondition(&v1alpha1.StorageNodeCondition{ |
| Type: v1alpha1.StorageNodeConditionTypeAvailable, |
| Status: corev1.ConditionTrue, |
| LastUpdateTime: metav1.Now(), |
| Reason: "All instances are ready", |
| }) |
| } else { |
| newSNConditions.UpsertCondition(&v1alpha1.StorageNodeCondition{ |
| Type: v1alpha1.StorageNodeConditionTypeAvailable, |
| Status: corev1.ConditionFalse, |
| LastUpdateTime: metav1.Now(), |
| Reason: "One or more instances are not ready", |
| }) |
| } |
| |
| // Update the registered condition |
| if status.Registered { |
| newSNConditions.UpsertCondition(&v1alpha1.StorageNodeCondition{ |
| Type: v1alpha1.StorageNodeConditionTypeRegistered, |
| Status: corev1.ConditionTrue, |
| LastUpdateTime: metav1.Now(), |
| Reason: "StorageNode is registered", |
| }) |
| } else { |
| newSNConditions.UpsertCondition(&v1alpha1.StorageNodeCondition{ |
| Type: v1alpha1.StorageNodeConditionTypeRegistered, |
| Status: corev1.ConditionFalse, |
| LastUpdateTime: metav1.Now(), |
| Reason: "StorageNode is not registered", |
| }) |
| } |
| return newSNConditions |
| } |
| |
| // allInstancesReady returns true if all instances are ready, false otherwise |
| func allInstancesReady(instances []v1alpha1.InstanceStatus) bool { |
| if len(instances) == 0 { |
| return false |
| } |
| |
| for idx := range instances { |
| instance := &instances[idx] |
| if !(instance.Status == string(dbmeshawsrds.DBInstanceStatusAvailable)) { |
| return false |
| } |
| } |
| |
| return true |
| } |
| |
| func (r *StorageNodeReconciler) getAwsRdsClient() aws.IRdsClient { |
| if _, ok := r.AwsSessions[r.AwsRegion]; !ok { |
| sessions := dbmeshaws.NewSessions().SetCredential(r.AwsRegion, r.AwsAccessKeyID, r.AwsSecretAccessKey).Build() |
| r.AwsSessions = sessions |
| } |
| return aws.NewRdsClient(dbmeshawsrds.NewService(r.AwsSessions[r.AwsRegion])) |
| } |
| |
| func (r *StorageNodeReconciler) reconcileAwsRdsInstance(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error { |
| instance, err := rdsClient.GetInstance(ctx, node) |
| if err != nil { |
| return err |
| } |
| |
| if instance == nil && node.Status.Phase != v1alpha1.StorageNodePhaseDeleting { |
| err = rdsClient.CreateInstance(ctx, node, storageProvider.Spec.Parameters) |
| if err != nil { |
| return err |
| } |
| |
| instance, err = rdsClient.GetInstance(ctx, node) |
| if err != nil { |
| return err |
| } |
| } |
| |
| if err := updateAWSRDSInstanceStatus(node, instance); err != nil { |
| return fmt.Errorf("updateAWSRDSInstanceStatus failed: %w", err) |
| } |
| |
| return nil |
| } |
| |
| func updateAWSRDSInstanceStatus(node *v1alpha1.StorageNode, instance *dbmeshawsrds.DescInstance) error { |
| instances := make([]v1alpha1.InstanceStatus, 0) |
| |
| if instance == nil { |
| node.Status.Instances = instances |
| return nil |
| } |
| |
| instances = append(instances, v1alpha1.InstanceStatus{ |
| Endpoint: v1alpha1.Endpoint{ |
| Address: instance.Endpoint.Address, |
| Port: instance.Endpoint.Port, |
| }, |
| Status: string(instance.DBInstanceStatus), |
| }) |
| |
| node.Status.Instances = instances |
| return nil |
| } |
| |
| // nolint:dupl |
| func (r *StorageNodeReconciler) reconcileAwsRDSCluster(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error { |
| rc, err := rdsClient.GetRDSCluster(ctx, node) |
| if err != nil { |
| return err |
| } |
| |
| if rc == nil { |
| // create instance |
| err = rdsClient.CreateRDSCluster(ctx, node, storageProvider.Spec.Parameters) |
| if err != nil { |
| return err |
| } |
| rc, err = rdsClient.GetRDSCluster(ctx, node) |
| if err != nil { |
| return err |
| } |
| } |
| |
| // update storage node status |
| if err := updateClusterStatus(ctx, rdsClient, node, rc); err != nil { |
| return fmt.Errorf("updateClusterStatus failed: %w", err) |
| } |
| |
| return nil |
| } |
| |
| // nolint:dupl |
| func (r *StorageNodeReconciler) reconcileAwsAurora(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error { |
| ac, err := rdsClient.GetAuroraCluster(ctx, node) |
| if err != nil { |
| return err |
| } |
| |
| if ac == nil { |
| // create instance |
| err = rdsClient.CreateAuroraCluster(ctx, node, storageProvider.Spec.Parameters) |
| if err != nil { |
| return err |
| } |
| ac, err = rdsClient.GetAuroraCluster(ctx, node) |
| if err != nil { |
| return err |
| } |
| } |
| |
| // update storage node status |
| if err := updateClusterStatus(ctx, rdsClient, node, ac); err != nil { |
| return fmt.Errorf("updateClusterStatus failed: %w", err) |
| } |
| |
| return nil |
| } |
| |
| func updateClusterStatus(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, cluster *dbmeshawsrds.DescCluster) error { |
| // update cluster status |
| clusterStatus := v1alpha1.ClusterStatus{} |
| if cluster != nil { |
| clusterStatus = v1alpha1.ClusterStatus{ |
| Status: cluster.Status, |
| PrimaryEndpoint: v1alpha1.Endpoint{ |
| Address: cluster.PrimaryEndpoint, |
| Port: cluster.Port, |
| }, |
| ReaderEndpoints: []v1alpha1.Endpoint{ |
| { |
| Address: cluster.ReaderEndpoint, |
| Port: cluster.Port, |
| }, |
| }, |
| } |
| } |
| node.Status.Cluster = clusterStatus |
| |
| // update instances status |
| identifier := node.Annotations[v1alpha1.AnnotationsClusterIdentifier] |
| filters := map[string][]string{ |
| "db-cluster-id": {identifier}, |
| } |
| instances, err := rdsClient.GetInstancesByFilters(ctx, filters) |
| if err != nil { |
| return fmt.Errorf("GetInstances failed, err:%w", err) |
| } |
| |
| var instanceStatus []v1alpha1.InstanceStatus |
| for _, instance := range instances { |
| instanceStatus = append(instanceStatus, v1alpha1.InstanceStatus{ |
| Status: string(instance.DBInstanceStatus), |
| Endpoint: v1alpha1.Endpoint{ |
| Address: instance.Endpoint.Address, |
| Port: instance.Endpoint.Port, |
| }}) |
| } |
| node.Status.Instances = instanceStatus |
| return nil |
| } |
| |
| // deleteDatabaseCluster |
| func (r *StorageNodeReconciler) deleteDatabaseCluster(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error { |
| switch storageProvider.Spec.Provisioner { |
| case v1alpha1.ProvisionerAWSRDSInstance: |
| if err := r.deleteAWSRDSInstance(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil { |
| return fmt.Errorf("delete aws rds instance failed: %w", err) |
| } |
| case v1alpha1.ProvisionerAWSRDSCluster: |
| if err := r.deleteAWSRDSCluster(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil { |
| return fmt.Errorf("delete aws rds cluster failed: %w", err) |
| } |
| case v1alpha1.ProvisionerAWSAurora: |
| if err := r.deleteAWSAurora(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil { |
| return fmt.Errorf("delete aws aurora cluster failed: %w", err) |
| } |
| default: |
| return fmt.Errorf("unsupported database provisioner %s", storageProvider.Spec.Provisioner) |
| } |
| return nil |
| } |
| |
| func (r *StorageNodeReconciler) deleteAWSRDSInstance(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error { |
| if node.Annotations[v1alpha1.AnnotationsInstanceIdentifier] == "" { |
| return nil |
| } |
| |
| instance, err := rdsClient.GetInstance(ctx, node) |
| if err != nil { |
| return err |
| } |
| |
| if instance != nil && instance.DBInstanceStatus != dbmeshawsrds.DBInstanceStatusDeleting { |
| if err := rdsClient.DeleteInstance(ctx, node, storageProvider); err != nil { |
| r.Recorder.Eventf(node, corev1.EventTypeWarning, "DeleteFailed", "Failed to delete instance %s: %s", node.Annotations[v1alpha1.AnnotationsInstanceIdentifier], err.Error()) |
| return err |
| } |
| r.Recorder.Event(node, corev1.EventTypeNormal, "Deleting", fmt.Sprintf("instance %s is deleting", node.Annotations[v1alpha1.AnnotationsInstanceIdentifier])) |
| } |
| |
| // update instance status |
| if err := updateAWSRDSInstanceStatus(node, instance); err != nil { |
| return fmt.Errorf("updateAWSRDSInstanceStatus failed: %w", err) |
| } |
| |
| return nil |
| } |
| |
| // nolint:dupl |
| func (r *StorageNodeReconciler) deleteAWSRDSCluster(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error { |
| if node.Annotations[v1alpha1.AnnotationsClusterIdentifier] == "" { |
| return nil |
| } |
| |
| cluster, err := rdsClient.GetRDSCluster(ctx, node) |
| if err != nil { |
| return fmt.Errorf("get rds cluster failed: %w", err) |
| } |
| if cluster != nil && cluster.Status != string(dbmeshawsrds.DBClusterStatusDeleting) { |
| if err := rdsClient.DeleteRDSCluster(ctx, node, storageProvider); err != nil { |
| r.Recorder.Eventf(node, corev1.EventTypeWarning, "DeleteFailed", "Failed to delete rds cluster %s: %s", node.Annotations[v1alpha1.AnnotationsClusterIdentifier], err.Error()) |
| return err |
| } |
| r.Recorder.Event(node, corev1.EventTypeNormal, "Deleting", fmt.Sprintf("rds cluster %s is deleting", node.Annotations[v1alpha1.AnnotationsClusterIdentifier])) |
| } |
| |
| // update storage node status |
| if err := updateClusterStatus(ctx, rdsClient, node, cluster); err != nil { |
| return fmt.Errorf("updateClusterStatus failed: %w", err) |
| } |
| return nil |
| } |
| |
| // nolint:dupl |
| func (r *StorageNodeReconciler) deleteAWSAurora(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error { |
| if node.Annotations[v1alpha1.AnnotationsClusterIdentifier] == "" { |
| return nil |
| } |
| |
| auroraCluster, err := rdsClient.GetAuroraCluster(ctx, node) |
| if err != nil { |
| return fmt.Errorf("get aurora cluster failed: %w", err) |
| } |
| if auroraCluster != nil && auroraCluster.Status != string(dbmeshawsrds.DBClusterStatusDeleting) { |
| if err := rdsClient.DeleteAuroraCluster(ctx, node, storageProvider); err != nil { |
| r.Recorder.Eventf(node, corev1.EventTypeWarning, "DeleteFailed", "Failed to delete aurora cluster %s: %s", node.Annotations[v1alpha1.AnnotationsClusterIdentifier], err.Error()) |
| return err |
| } |
| r.Recorder.Event(node, corev1.EventTypeNormal, "Deleting", fmt.Sprintf("aurora cluster %s is deleting", node.Annotations[v1alpha1.AnnotationsClusterIdentifier])) |
| } |
| |
| // update storage node status |
| if err := updateClusterStatus(ctx, rdsClient, node, auroraCluster); err != nil { |
| return fmt.Errorf("updateClusterStatus failed: %w", err) |
| } |
| return nil |
| } |
| |
| // registerStorageUnit |
| func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error { |
| // if register storage unit is not enabled, return |
| if node.Annotations[AnnotationKeyRegisterStorageUnitEnabled] != "true" { |
| return nil |
| } |
| |
| if err := r.validateComputeNodeAnnotations(node); err != nil { |
| return err |
| } |
| |
| // if storage unit is already registered, return |
| if node.Status.Registered { |
| return nil |
| } |
| |
| // if node is not ready, return |
| if node.Status.Phase != v1alpha1.StorageNodePhaseReady { |
| r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterWaiting", "Waiting to register storage unit for node %s/%s: node is not ready", node.GetNamespace(), node.GetName()) |
| return nil |
| } |
| |
| logicDBName := node.Annotations[AnnotationKeyLogicDatabaseName] |
| dbName := node.Annotations[v1alpha1.AnnotationsInstanceDBName] |
| |
| ssServer, err := r.getShardingsphereServer(ctx, node) |
| if err != nil { |
| return fmt.Errorf("getShardingsphereServer failed: %w", err) |
| } |
| |
| defer ssServer.Close() |
| |
| if err := ssServer.CreateDatabase(logicDBName); err != nil { |
| return fmt.Errorf("create database failed: %w", err) |
| } |
| r.Recorder.Eventf(node, corev1.EventTypeNormal, "LogicDatabaseCreated", "LogicDatabase %s is created", logicDBName) |
| |
| var host string |
| var port int32 |
| var username, password string |
| // get storage unit info from instance |
| if node.Status.Cluster.Status == "" { |
| host, port, username, password = getDatasourceInfoFromInstance(node, storageProvider) |
| } else { |
| host, port, username, password = getDatasourceInfoFromCluster(node, storageProvider) |
| } |
| |
| if err := ssServer.RegisterStorageUnit(logicDBName, getDSName(node), host, uint(port), dbName, username, password); err != nil { |
| return fmt.Errorf("register storage node failed: %w", err) |
| } |
| r.Recorder.Eventf(node, corev1.EventTypeNormal, "StorageUnitRegistered", "StorageUnit %s:%d/%s is registered", host, port, dbName) |
| |
| node.Status.Registered = true |
| return nil |
| } |
| |
| // getDSName returns the datasource name of the storage node. |
| // datasource name only allows letters, numbers and _, and must start with a letter. |
| // ref: https://shardingsphere.apache.org/document/current/en/user-manual/shardingsphere-proxy/distsql/syntax/rdl/storage-unit-definition/register-storage-unit/ |
| func getDSName(node *v1alpha1.StorageNode) string { |
| return fmt.Sprintf("ds_%s", strings.ReplaceAll(node.GetName(), "-", "_")) |
| } |
| |
| func getDatasourceInfoFromInstance(node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) (host string, port int32, username, password string) { |
| ins := node.Status.Instances[0] |
| host = ins.Endpoint.Address |
| port = ins.Endpoint.Port |
| username = node.Annotations[v1alpha1.AnnotationsMasterUsername] |
| if username == "" { |
| username = storageProvider.Spec.Parameters["masterUsername"] |
| } |
| password = node.Annotations[v1alpha1.AnnotationsMasterUserPassword] |
| if password == "" { |
| password = storageProvider.Spec.Parameters["masterUserPassword"] |
| } |
| return |
| } |
| |
| func getDatasourceInfoFromCluster(node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) (host string, port int32, username, password string) { |
| cluster := node.Status.Cluster |
| host = cluster.PrimaryEndpoint.Address |
| port = cluster.PrimaryEndpoint.Port |
| username = node.Annotations[v1alpha1.AnnotationsMasterUsername] |
| if username == "" { |
| username = storageProvider.Spec.Parameters["masterUsername"] |
| } |
| password = node.Annotations[v1alpha1.AnnotationsMasterUserPassword] |
| if password == "" { |
| password = storageProvider.Spec.Parameters["masterUserPassword"] |
| } |
| return |
| } |
| |
| func (r *StorageNodeReconciler) unregisterStorageUnit(ctx context.Context, node *v1alpha1.StorageNode) error { |
| if !node.Status.Registered { |
| return nil |
| } |
| if err := r.validateComputeNodeAnnotations(node); err != nil { |
| return err |
| } |
| |
| logicDBName := node.Annotations[AnnotationKeyLogicDatabaseName] |
| |
| ssServer, err := r.getShardingsphereServer(ctx, node) |
| if err != nil { |
| return fmt.Errorf("getShardingsphereServer failed: %w", err) |
| } |
| |
| defer ssServer.Close() |
| |
| if err := ssServer.UnRegisterStorageUnit(logicDBName, getDSName(node)); err != nil { |
| return fmt.Errorf("unregister storage unit failed: %w", err) |
| } |
| |
| r.Recorder.Eventf(node, corev1.EventTypeNormal, "StorageUnitUnRegistered", "StorageUnit of node %s/%s is unregistered", node.GetNamespace(), node.GetName()) |
| |
| node.Status.Registered = false |
| return nil |
| } |
| |
| func (r *StorageNodeReconciler) validateComputeNodeAnnotations(node *v1alpha1.StorageNode) error { |
| requiredAnnos := []string{ |
| AnnotationKeyLogicDatabaseName, |
| v1alpha1.AnnotationsInstanceDBName, |
| AnnotationKeyComputeNodeName, |
| } |
| |
| for _, anno := range requiredAnnos { |
| if v, ok := node.Annotations[anno]; !ok || v == "" { |
| r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterChecking", "Waiting to register storage unit for node %s/%s: annotation %s is required", node.GetNamespace(), node.GetName(), anno) |
| return fmt.Errorf("annotation %s is required", anno) |
| } |
| } |
| |
| return nil |
| } |
| |
| func (r *StorageNodeReconciler) getShardingsphereServer(ctx context.Context, node *v1alpha1.StorageNode) (shardingsphere.IServer, error) { |
| var ( |
| driver, host, username, password string |
| port uint |
| ) |
| |
| // get compute node |
| cn := &v1alpha1.ComputeNode{} |
| if err := r.Client.Get(ctx, types.NamespacedName{ |
| Name: node.Annotations[AnnotationKeyComputeNodeName], |
| Namespace: node.Namespace, |
| }, cn); err != nil { |
| return nil, fmt.Errorf("get compute node failed: %w", err) |
| } |
| |
| serverConf := cn.Spec.Bootstrap.ServerConfig |
| |
| driver, ok := serverConf.Props[ShardingSphereProtocolType] |
| if !ok || driver == "" { |
| driver = "mysql" |
| } |
| driver = strings.ToLower(driver) |
| |
| if len(serverConf.Authority.Users) == 0 { |
| return nil, fmt.Errorf("no user in compute node %s/%s", cn.Namespace, cn.Name) |
| } |
| |
| username = strings.Split(serverConf.Authority.Users[0].User, "@")[0] |
| password = serverConf.Authority.Users[0].Password |
| |
| // get service of compute node |
| svc, err := r.Service.GetByNamespacedName(ctx, types.NamespacedName{ |
| Name: node.Annotations[AnnotationKeyComputeNodeName], |
| Namespace: node.Namespace, |
| }) |
| |
| if err != nil || svc == nil { |
| return nil, fmt.Errorf("get service failed: %w", err) |
| } |
| |
| host = fmt.Sprintf("%s.%s", svc.Name, svc.Namespace) |
| |
| port = uint(svc.Spec.Ports[0].Port) |
| |
| ssServer, err := shardingsphere.NewServer(driver, host, port, username, password) |
| if err != nil { |
| return nil, fmt.Errorf("new shardingsphere server failed: %w", err) |
| } |
| |
| return ssServer, nil |
| } |
| |
| func (r *StorageNodeReconciler) reconcileCloudNativePG(ctx context.Context, sn *v1alpha1.StorageNode, sp *v1alpha1.StorageProvider) error { |
| cluster, err := r.getCloudNativePGCluster(ctx, types.NamespacedName{Namespace: sn.Namespace, Name: sn.Name}) |
| if err != nil { |
| return err |
| } |
| |
| if err := updateCloudNativePGCluster(ctx, sn, cluster); err != nil { |
| return err |
| } |
| |
| if cluster != nil { |
| return r.updateCloudNativePGCluster(ctx, sn, sp, cluster) |
| } |
| return r.createCloudNativePGCluster(ctx, sn, sp) |
| } |
| |
| func updateCloudNativePGCluster(ctx context.Context, node *v1alpha1.StorageNode, cluster *cnpg.Cluster) error { |
| if cluster == nil { |
| return nil |
| } |
| var cs string |
| if cluster.Status.Phase == cnpg.PhaseHealthy { |
| node.Status.Phase = v1alpha1.StorageNodePhaseReady |
| cs = "available" |
| } else { |
| node.Status.Phase = v1alpha1.StorageNodePhaseNotReady |
| } |
| |
| node.Status.Cluster = v1alpha1.ClusterStatus{ |
| Status: cs, |
| PrimaryEndpoint: v1alpha1.Endpoint{ |
| Address: cluster.Status.WriteService, |
| Port: 5432, |
| }, |
| ReaderEndpoints: []v1alpha1.Endpoint{ |
| { |
| Address: cluster.Status.ReadService, |
| Port: 5432, |
| }, |
| }, |
| } |
| |
| node.Status.Instances = []v1alpha1.InstanceStatus{} |
| |
| for s, pgins := range cluster.Status.InstancesStatus { |
| var stat string |
| if s == cnpgutils.PodHealthy { |
| stat = "available" |
| } else { |
| stat = string(s) |
| } |
| for _, pg := range pgins { |
| ins := v1alpha1.InstanceStatus{ |
| Status: stat, |
| Endpoint: v1alpha1.Endpoint{ |
| Address: pg, |
| Port: 5432, |
| }, |
| } |
| |
| node.Status.Instances = append(node.Status.Instances, ins) |
| } |
| } |
| |
| return nil |
| } |
| |
| func (r *StorageNodeReconciler) getCloudNativePGCluster(ctx context.Context, namespacedName types.NamespacedName) (*cnpg.Cluster, error) { |
| c, err := r.CNPG.GetClusterByNamespacedName(ctx, namespacedName) |
| if err != nil { |
| return nil, err |
| } |
| return c, nil |
| } |
| |
| func (r *StorageNodeReconciler) createCloudNativePGCluster(ctx context.Context, sn *v1alpha1.StorageNode, sp *v1alpha1.StorageProvider) error { |
| cluster := r.CNPG.Build(ctx, sn, sp) |
| err := r.CNPG.Create(ctx, cluster) |
| if err != nil && apierrors.IsAlreadyExists(err) || err == nil { |
| return nil |
| } |
| return err |
| } |
| |
| func (r *StorageNodeReconciler) updateCloudNativePGCluster(ctx context.Context, sn *v1alpha1.StorageNode, sp *v1alpha1.StorageProvider, cluster *cnpg.Cluster) error { |
| exp := r.CNPG.Build(ctx, sn, sp) |
| exp.ObjectMeta = cluster.ObjectMeta |
| exp.Labels = cluster.Labels |
| exp.Annotations = cluster.Annotations |
| |
| if !reflect.DeepEqual(cluster.Spec, exp.Spec) { |
| return r.CNPG.Update(ctx, exp) |
| } |
| return nil |
| } |
| |
| // SetupWithManager sets up the controller with the Manager |
| func (r *StorageNodeReconciler) SetupWithManager(mgr ctrl.Manager) error { |
| return ctrl.NewControllerManagedBy(mgr). |
| For(&v1alpha1.StorageNode{}). |
| Complete(r) |
| } |