blob: 1fac6f8d9bac29073c8d09a85ecdce575b460e35 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package e2e
import (
"context"
"database/sql"
"fmt"
"reflect"
"regexp"
"time"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/controllers"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/storagenode/aws"
"bou.ke/monkey"
"github.com/DATA-DOG/go-sqlmock"
dbmeshawsrds "github.com/database-mesh/golang-sdk/aws/client/rds"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
var _ = Describe("StorageNode Controller Suite Test For AWS RDS Instance", func() {
storageProviderName := "test-storage-provider"
instanceIdentifier := "test-instance-identifier"
BeforeEach(func() {
StorageProvider := &v1alpha1.StorageProvider{
ObjectMeta: metav1.ObjectMeta{
Name: storageProviderName,
},
Spec: v1alpha1.StorageProviderSpec{
Provisioner: v1alpha1.ProvisionerAWSRDSInstance,
Parameters: map[string]string{
"engine": "mysql",
"engineVersion": "5.7",
"instanceClass": "db.t3.micro",
"allocatedStorage": "20",
"masterUsername": "root",
"masterUserPassword": "root123456",
},
},
}
Expect(k8sClient.Create(ctx, StorageProvider)).Should(Succeed())
})
AfterEach(func() {
StorageProvider := &v1alpha1.StorageProvider{}
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: storageProviderName}, StorageProvider)).Should(Succeed())
Expect(k8sClient.Delete(ctx, StorageProvider)).Should(Succeed())
})
Context("reconcile storageNode", func() {
AfterEach(func() {
monkey.UnpatchAll()
})
It("should create success", func() {
// mock get instance func returns success
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmeshawsrds.DescInstance, error) {
return &dbmeshawsrds.DescInstance{
DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
Endpoint: dbmeshawsrds.Endpoint{
Address: "127.0.0.1",
Port: 3306,
},
}, nil
})
// mock delete instance func returns success
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "DeleteInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode, _ *v1alpha1.StorageProvider) error {
return nil
})
nodeName := "test-storage-node-ready"
node := &v1alpha1.StorageNode{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
Namespace: "default",
Annotations: map[string]string{
v1alpha1.AnnotationsInstanceIdentifier: instanceIdentifier,
},
},
Spec: v1alpha1.StorageNodeSpec{
StorageProviderName: storageProviderName,
},
}
// create resource
Expect(k8sClient.Create(ctx, node)).Should(Succeed())
// check storage node status
Eventually(func() v1alpha1.StorageNodePhaseStatus {
newSN := &v1alpha1.StorageNode{}
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)).Should(Succeed())
return newSN.Status.Phase
}, 10*time.Second, 1*time.Second).Should(Equal(v1alpha1.StorageNodePhaseReady))
// delete resource
Expect(k8sClient.Delete(ctx, node)).Should(Succeed())
})
It("should delete success", func() {
nodeName := "test-storage-node-delete"
node := &v1alpha1.StorageNode{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
Namespace: "default",
Annotations: map[string]string{
v1alpha1.AnnotationsInstanceIdentifier: instanceIdentifier,
},
},
Spec: v1alpha1.StorageNodeSpec{
StorageProviderName: storageProviderName,
},
}
Expect(k8sClient.Create(ctx, node)).Should(Succeed())
getNode := &v1alpha1.StorageNode{}
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, getNode)).Should(Succeed())
// delete storage node
Expect(k8sClient.Delete(ctx, getNode)).Should(Succeed())
Eventually(func() bool {
newSN := &v1alpha1.StorageNode{}
err := k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)
return err != nil
}, 10*time.Second, 1*time.Second).Should(BeTrue())
})
It("should register and unregister storage unit success", func() {
// mock mysql
db, dbmock, err := sqlmock.New()
Expect(err).Should(Succeed())
Expect(dbmock).ShouldNot(BeNil())
// mock rds DescribeDBInstances func returns success
g := monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmeshawsrds.DescInstance, error) {
return &dbmeshawsrds.DescInstance{
DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
Endpoint: dbmeshawsrds.Endpoint{
Address: "127.0.0.1",
Port: 3306,
},
}, nil
})
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "DeleteInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode, _ *v1alpha1.StorageProvider) error {
return nil
})
monkey.Patch(sql.Open, func(_ string, _ string) (*sql.DB, error) {
return db, nil
})
monkey.PatchInstanceMethod(reflect.TypeOf(db), "Close", func(_ *sql.DB) error {
return nil
})
cn := &v1alpha1.ComputeNode{
ObjectMeta: metav1.ObjectMeta{
Name: "test-compute-node",
Namespace: "default",
Labels: map[string]string{
"app": "test-app",
},
},
Spec: v1alpha1.ComputeNodeSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "test-app",
},
},
PortBindings: []v1alpha1.PortBinding{
{
Name: "http",
ContainerPort: 3307,
Protocol: "TCP",
ServicePort: 3307,
},
},
Bootstrap: v1alpha1.BootstrapConfig{
ServerConfig: v1alpha1.ServerConfig{
Authority: v1alpha1.ComputeNodeAuthority{
Users: []v1alpha1.ComputeNodeUser{
{
User: "test-user",
Password: "test-password",
},
},
Privilege: v1alpha1.ComputeNodePrivilege{
Type: v1alpha1.AllPermitted,
},
},
Props: map[string]string{
"proxy-frontend-database-protocol-type": "MySQL",
},
Mode: v1alpha1.ComputeNodeServerMode{
Repository: v1alpha1.Repository{
Type: "ZooKeeper",
Props: nil,
},
Type: "Zookeeper",
},
},
},
},
}
nodeName := "test-storage-node-register"
node := &v1alpha1.StorageNode{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
Namespace: "default",
Annotations: map[string]string{
v1alpha1.AnnotationsInstanceIdentifier: instanceIdentifier,
controllers.AnnotationKeyRegisterStorageUnitEnabled: "true",
v1alpha1.AnnotationsInstanceDBName: "test-db-name",
controllers.AnnotationKeyComputeNodeName: "test-compute-node",
controllers.AnnotationKeyLogicDatabaseName: "test-logic-db-name",
},
},
Spec: v1alpha1.StorageNodeSpec{
StorageProviderName: storageProviderName,
},
}
Expect(k8sClient.Create(ctx, cn)).Should(Succeed())
Expect(k8sClient.Create(ctx, node)).Should(Succeed())
dbmock.ExpectExec(regexp.QuoteMeta("CREATE DATABASE IF NOT EXISTS")).WillReturnResult(sqlmock.NewResult(1, 1))
dbmock.ExpectExec(regexp.QuoteMeta("USE")).WillReturnResult(sqlmock.NewResult(1, 1))
dbmock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE UNIT IF NOT EXISTS")).WillReturnResult(sqlmock.NewResult(0, 0))
Eventually(func() v1alpha1.StorageNodePhaseStatus {
newSN := &v1alpha1.StorageNode{}
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)).Should(Succeed())
return newSN.Status.Phase
}, time.Second*10, time.Millisecond*250).Should(Equal(v1alpha1.StorageNodePhaseReady))
Eventually(func() bool {
newSN := &v1alpha1.StorageNode{}
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)).Should(Succeed())
return newSN.Status.Registered
}, 20, 2).Should(BeTrue())
// delete storage node
Expect(k8sClient.Delete(ctx, node)).Should(Succeed())
dbmock.ExpectExec(regexp.QuoteMeta("USE")).WillReturnResult(sqlmock.NewResult(1, 1))
dbmock.ExpectQuery(regexp.QuoteMeta("SHOW RULES USED STORAGE UNIT")).WillReturnRows(sqlmock.NewRows([]string{"type", "name"}).AddRow("sharding", "t_order"))
dbmock.ExpectExec("DROP SHARDING TABLE RULE").WillReturnResult(sqlmock.NewResult(1, 1))
dbmock.ExpectExec(regexp.QuoteMeta("UNREGISTER STORAGE UNIT")).WillReturnResult(sqlmock.NewResult(0, 0))
Eventually(func() v1alpha1.StorageNodePhaseStatus {
newSN := &v1alpha1.StorageNode{}
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)).Should(Succeed())
return newSN.Status.Phase
}, 20, 2).Should(Equal(v1alpha1.StorageNodePhaseDeleting))
g.Unpatch()
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmeshawsrds.DescInstance, error) {
return nil, nil
})
Eventually(func() bool {
newSN := &v1alpha1.StorageNode{}
err := k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)
return apierrors.IsNotFound(err)
}, 20, 2).Should(BeTrue())
})
})
})
var _ = Describe("StorageNode Controller Suite Test For AWS Aurora Cluster", func() {
storageProviderName := "test-storage-provider"
clusterIdentifier := "test-aurora-cluster-identifier"
BeforeEach(func() {
provider := &v1alpha1.StorageProvider{
ObjectMeta: metav1.ObjectMeta{
Name: storageProviderName,
},
Spec: v1alpha1.StorageProviderSpec{
Provisioner: v1alpha1.ProvisionerAWSAurora,
Parameters: map[string]string{
"engine": "aurora-mysql",
"engineVersion": "5.7",
"instanceClass": "db.t2.small",
"masterUsername": "test-user",
"masterUserPassword": "test-password",
},
},
}
Expect(k8sClient.Create(ctx, provider)).Should(Succeed())
})
AfterEach(func() {
monkey.UnpatchAll()
StorageProvider := &v1alpha1.StorageProvider{}
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: storageProviderName}, StorageProvider)).Should(Succeed())
Expect(k8sClient.Delete(ctx, StorageProvider)).Should(Succeed())
})
Context("When Creat StorageNode with Aurora Cluster Not Exist", func() {
It("Should Success", func() {
snName := "test-storage-node-creating"
// monkey patch
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetAuroraCluster", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmeshawsrds.DescCluster, error) {
return &dbmeshawsrds.DescCluster{
DBClusterIdentifier: clusterIdentifier,
Status: string(dbmeshawsrds.DBClusterStatusCreating),
PrimaryEndpoint: "test-primary-endpoint",
ReaderEndpoint: "test-reader-endpoint",
Port: 3306,
}, nil
})
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstancesByFilters", func(_ *aws.RdsClient, _ context.Context, _ map[string][]string) ([]*dbmeshawsrds.DescInstance, error) {
return []*dbmeshawsrds.DescInstance{
{
DBInstanceIdentifier: fmt.Sprintf("%s-insatnce-0", clusterIdentifier),
DBInstanceStatus: dbmeshawsrds.DBInstanceStatusCreating,
Endpoint: dbmeshawsrds.Endpoint{Address: "test-instance-0-endpoint", Port: 3306},
},
}, nil
})
sn := &v1alpha1.StorageNode{
ObjectMeta: metav1.ObjectMeta{
Name: snName,
Namespace: "default",
Annotations: map[string]string{
v1alpha1.AnnotationsClusterIdentifier: clusterIdentifier,
},
},
Spec: v1alpha1.StorageNodeSpec{
StorageProviderName: storageProviderName,
Replicas: 2,
},
}
Expect(k8sClient.Create(ctx, sn)).Should(Succeed())
Eventually(func() string {
newSN := &v1alpha1.StorageNode{}
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: snName, Namespace: "default"}, newSN)).Should(Succeed())
return newSN.Status.Cluster.Status
}, time.Second*10, time.Millisecond*250).Should(Equal(string(dbmeshawsrds.DBClusterStatusCreating)))
})
It("should success when cluster is available", func() {
snName := "test-storage-node-available"
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetAuroraCluster", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmeshawsrds.DescCluster, error) {
return &dbmeshawsrds.DescCluster{
DBClusterIdentifier: clusterIdentifier,
Status: string(dbmeshawsrds.DBClusterStatusAvailable),
PrimaryEndpoint: "test-primary-endpoint",
ReaderEndpoint: "test-reader-endpoint",
Port: 3306,
}, nil
})
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstancesByFilters", func(_ *aws.RdsClient, _ context.Context, _ map[string][]string) ([]*dbmeshawsrds.DescInstance, error) {
return []*dbmeshawsrds.DescInstance{
{
DBInstanceIdentifier: fmt.Sprintf("%s-insatnce-0", clusterIdentifier),
DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
Endpoint: dbmeshawsrds.Endpoint{Address: "test-instance-0-endpoint", Port: 3306},
},
{
DBInstanceIdentifier: fmt.Sprintf("%s-insatnce-1", clusterIdentifier),
DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
Endpoint: dbmeshawsrds.Endpoint{Address: "test-instance-1-endpoint", Port: 3306},
},
}, nil
})
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "DeleteAuroraCluster", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode, _ *v1alpha1.StorageProvider) error {
return nil
})
sn := &v1alpha1.StorageNode{
ObjectMeta: metav1.ObjectMeta{
Name: snName,
Namespace: "default",
Annotations: map[string]string{
v1alpha1.AnnotationsClusterIdentifier: clusterIdentifier,
},
},
Spec: v1alpha1.StorageNodeSpec{
StorageProviderName: storageProviderName,
Replicas: 2,
},
}
Expect(k8sClient.Create(ctx, sn)).Should(Succeed())
newSN := &v1alpha1.StorageNode{}
Eventually(func() v1alpha1.StorageNodePhaseStatus {
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: snName, Namespace: "default"}, newSN)).Should(Succeed())
return newSN.Status.Phase
}, time.Second*10, time.Millisecond*250).Should(Equal(v1alpha1.StorageNodePhaseReady))
Expect(newSN.Status.Instances).Should(HaveLen(2))
})
})
})