blob: 41c80e09526308c729d279efa2c76f772b0ab6cd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2etest
import (
"testing"
"github.com/apache/airflow-on-k8s-operator/api/v1alpha1"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"sigs.k8s.io/controller-reconciler/pkg/test"
)
const (
CRName = "AirflowCluster"
SampleDir = "../../../hack/sample/"
)
var f *test.Framework
var ctx, basectx *test.Context
var deleteBase bool
func airflowBase(file string) *v1alpha1.AirflowBase {
cr := &v1alpha1.AirflowBase{}
if err := f.LoadFromFile(file, cr); err != nil {
return nil
}
return cr
}
func airflowCluster(file string) *v1alpha1.AirflowCluster {
cr := &v1alpha1.AirflowCluster{}
if err := f.LoadFromFile(file, cr); err != nil {
return nil
}
return cr
}
func Test(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, CRName+" Suite")
}
var _ = BeforeSuite(func() {
f = test.New(CRName)
err := v1alpha1.SchemeBuilder.AddToScheme(f.GetScheme())
Expect(err).NotTo(HaveOccurred(), "failed to initialize the Framework: %v", err)
f.Start()
})
var _ = AfterSuite(func() {
if ctx != nil {
ctx.DeleteCR()
}
if basectx != nil {
basectx.DeleteCR()
}
if f != nil {
f.Stop()
}
})
func isBaseReady(cr interface{}) bool {
stts := cr.(*v1alpha1.AirflowBase).Status
return stts.IsReady()
}
func isClusterReady(cr interface{}) bool {
stts := cr.(*v1alpha1.AirflowCluster).Status
return stts.IsReady()
}
func isMemoryStoreReady(cr interface{}) bool {
stts := cr.(*v1alpha1.AirflowCluster).Spec.MemoryStore.Status
return stts.IsReady()
}
func checkLocal(cr *v1alpha1.AirflowCluster) {
ctx.WithTimeout(200).CheckStatefulSet(cr.Name+"-airflowui", 1, 1)
ctx.WithTimeout(200).CheckStatefulSet(cr.Name+"-scheduler", 1, 1)
ctx.WithTimeout(200).CheckCR(isClusterReady)
}
func checkCelery(cr *v1alpha1.AirflowCluster) {
ctx.WithTimeout(200).CheckStatefulSet(cr.Name+"-airflowui", 1, 1)
ctx.WithTimeout(200).CheckStatefulSet(cr.Name+"-flower", 1, 1)
ctx.WithTimeout(200).CheckStatefulSet(cr.Name+"-redis", 1, 1)
ctx.WithTimeout(200).CheckStatefulSet(cr.Name+"-scheduler", 1, 1)
ctx.WithTimeout(200).CheckStatefulSet(cr.Name+"-worker", 2, 1)
ctx.WithTimeout(10).CheckService(cr.Name+"-redis", map[string]int32{"redis": 6379})
ctx.WithTimeout(200).CheckCR(isClusterReady)
}
func checkCeleryMemoryStore(cr *v1alpha1.AirflowCluster) {
ctx.WithTimeout(500).CheckCR(isMemoryStoreReady)
ctx.WithTimeout(200).CheckStatefulSet(cr.Name+"-airflowui", 1, 1)
ctx.WithTimeout(200).CheckStatefulSet(cr.Name+"-flower", 1, 1)
ctx.WithTimeout(200).CheckStatefulSet(cr.Name+"-scheduler", 1, 1)
ctx.WithTimeout(200).CheckStatefulSet(cr.Name+"-worker", 2, 1)
ctx.WithTimeout(200).CheckCR(isClusterReady)
}
var _ = Describe(CRName+" controller tests", func() {
AfterEach(func() {
if ctx != nil {
ctx.DeleteCR()
}
if deleteBase {
deleteBase = false
basectx.DeleteCR()
basectx = nil
}
ctx = nil
})
It("creating a "+CRName+" with cloudsql", func() {
basectx = f.NewContext().WithCR(airflowBase(SampleDir + "cloudsql-celery/base.yaml"))
cr := basectx.CR.(*v1alpha1.AirflowBase)
By("creating a new " + CRName + ": " + cr.Name)
basectx.CreateCR()
basectx.WithTimeout(200).CheckStatefulSet(cr.Name+"-sqlproxy", 1, 1)
basectx.WithTimeout(10).CheckService(cr.Name+"-sql", map[string]int32{"sqlproxy": 5432})
//basectx.WithTimeout(10).CheckSecret(name)
basectx.WithTimeout(200).CheckStatefulSet(cr.Name+"-nfs", 1, 1)
basectx.WithTimeout(100).CheckCR(isBaseReady)
})
// https://github.com/GoogleCloudPlatform/airflow-operator/issues/48
It("creating a "+CRName+" with cloudsql, celery executor", func() {
ctx = f.NewContext().WithCR(airflowCluster(SampleDir + "cloudsql-celery/cluster.yaml"))
cr := ctx.CR.(*v1alpha1.AirflowCluster)
By("creating a new " + CRName + ": " + cr.Name)
ctx.CreateCR()
checkCelery(cr)
})
It("creating a "+CRName+" with cloudsql, local executor", func() {
ctx = f.NewContext().WithCR(airflowCluster(SampleDir + "cloudsql-local/cluster.yaml"))
cr := ctx.CR.(*v1alpha1.AirflowCluster)
By("creating a new " + CRName + ": " + cr.Name)
ctx.CreateCR()
checkLocal(cr)
})
It("creating a "+CRName+" with cloudsql, k8s executor", func() {
ctx = f.NewContext().WithCR(airflowCluster(SampleDir + "cloudsql-k8s/cluster.yaml"))
cr := ctx.CR.(*v1alpha1.AirflowCluster)
By("creating a new " + CRName + ": " + cr.Name)
ctx.CreateCR()
checkLocal(cr)
deleteBase = true
})
It("creating a "+CRName+" with mysql, celery executor and gcs", func() {
basectx = f.NewContext().WithCR(airflowBase(SampleDir + "mysql-celery/base.yaml"))
ctx = f.NewContext().WithCR(airflowCluster(SampleDir + "mysql-celery-gcs/cluster.yaml"))
basecr := basectx.CR.(*v1alpha1.AirflowBase)
cr := ctx.CR.(*v1alpha1.AirflowCluster)
By("creating a base " + basecr.Name)
basectx.CreateCR()
basectx.WithTimeout(200).CheckCR(isBaseReady)
By("creating a new " + CRName + ": " + cr.Name)
ctx.CreateCR()
checkCelery(cr)
})
It("creating a "+CRName+" with postgres, celery executor and memorystore", func() {
basectx = f.NewContext().WithCR(airflowBase(SampleDir + "postgres-celery/base.yaml"))
ctx = f.NewContext().WithCR(airflowCluster(SampleDir + "postgres-celery-memorystore/cluster.yaml"))
basecr := basectx.CR.(*v1alpha1.AirflowBase)
cr := ctx.CR.(*v1alpha1.AirflowCluster)
By("creating a base " + basecr.Name)
basectx.CreateCR()
basectx.WithTimeout(200).CheckCR(isBaseReady)
By("creating a new " + CRName + ": " + cr.Name)
ctx.CreateCR()
checkCeleryMemoryStore(cr)
})
})