blob: 08529375ea50ca0c0e7b61483724a89135db6221 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controllers
import (
"context"
"encoding/base64"
"sort"
"strconv"
"strings"
"time"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
alpha1 "github.com/apache/airflow-on-k8s-operator/api/v1alpha1"
"github.com/apache/airflow-on-k8s-operator/controllers/application"
"github.com/apache/airflow-on-k8s-operator/controllers/common"
app "github.com/kubernetes-sigs/application/pkg/apis/app/v1beta1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1beta1"
rbacv1 "k8s.io/api/rbac/v1"
"sigs.k8s.io/controller-reconciler/pkg/finalizer"
gr "sigs.k8s.io/controller-reconciler/pkg/genericreconciler"
"sigs.k8s.io/controller-reconciler/pkg/reconciler"
"sigs.k8s.io/controller-reconciler/pkg/reconciler/manager/gcp"
"sigs.k8s.io/controller-reconciler/pkg/reconciler/manager/gcp/redis"
"sigs.k8s.io/controller-reconciler/pkg/reconciler/manager/k8s"
"sigs.k8s.io/controller-reconciler/pkg/status"
"sigs.k8s.io/controller-runtime/pkg/manager"
)
const (
afk = "AIRFLOW__KUBERNETES__"
afc = "AIRFLOW__CORE__"
gitSyncDestDir = "gitdags"
gCSSyncDestDir = "dags"
airflowHome = "/usr/local/airflow"
airflowDagsBase = airflowHome + "/dags/"
)
// Add creates a new AirflowBase Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
// AirflowClusterReconciler reconciles a AirflowCluster object
type AirflowClusterReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=,resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=app.k8s.io,resources=applications,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=rolebindings,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=,resources=secrets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=,resources=serviceaccounts,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=storage.k8s.io,resources=storageclasses,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=airflow.apache.org,resources=airflowclusters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=airflow.apache.org,resources=airflowclusters/status,verbs=get;update;patch
// Reconcile - Dummy TODO remove this
func (r *AirflowClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
_ = context.Background()
_ = r.Log.WithValues("airflowcluster", req.NamespacedName)
// your logic here
return ctrl.Result{}, nil
}
// SetupWithManager - called by main
func (r *AirflowClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
_ = app.AddToScheme(r.Scheme)
return ctrl.NewControllerManagedBy(mgr).
For(&alpha1.AirflowCluster{}).
Complete(acReconciler(mgr))
}
func acReconciler(mgr manager.Manager) *gr.Reconciler {
return gr.
WithManager(mgr).
WithResourceManager(redis.Getter(context.TODO())).
For(&alpha1.AirflowCluster{}, alpha1.GroupVersion).
Using(&UI{}).
Using(&Redis{}).
Using(&MemoryStore{}).
Using(&Flower{}).
Using(&Scheduler{}).
Using(&Worker{}).
Using(&Cluster{}).
WithErrorHandler(acHandleError).
WithValidator(acValidate).
WithDefaulter(acApplyDefaults).
Build()
}
func acHandleError(resource interface{}, err error, kind string) {
ac := resource.(*alpha1.AirflowCluster)
if err != nil {
ac.Status.SetError("ErrorSeen", err.Error())
} else {
ac.Status.ClearError()
}
}
func acValidate(resource interface{}) error {
ac := resource.(*alpha1.AirflowCluster)
return ac.Validate()
}
func acApplyDefaults(resource interface{}) {
ac := resource.(*alpha1.AirflowCluster)
ac.ApplyDefaults()
}
// Cluster - interface to handle airflowbase
type Cluster struct{}
// Redis - interface to handle redis
type Redis struct{}
// Flower - interface to handle flower
type Flower struct{}
// Scheduler - interface to handle scheduler
type Scheduler struct{}
// Worker - interface to handle worker
type Worker struct{}
// UI - interface to handle ui
type UI struct{}
// MemoryStore - interface to handle memorystore
type MemoryStore struct{}
// --------------- common functions -------------------------
func envFromSecret(name string, key string) *corev1.EnvVarSource {
return &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: name,
},
Key: key,
},
}
}
// IsPostgres return true for postgres
func IsPostgres(s *alpha1.AirflowBaseSpec) bool {
postgres := false
if s.Postgres != nil {
postgres = true
}
if s.SQLProxy != nil && s.SQLProxy.Type == common.ValueSQLProxyTypePostgres {
postgres = true
}
return postgres
}
func updateSts(o *reconciler.Object, v interface{}) (*appsv1.StatefulSet, *common.TemplateValue) {
r := v.(*common.TemplateValue)
sts := o.Obj.(*k8s.Object).Obj.(*appsv1.StatefulSet)
sts.Spec.Template.Spec.Containers[0].Env = getAirflowEnv(r.Cluster, sts.Name, r.Base)
addAirflowContainers(r.Cluster, sts)
return sts, r
}
func acTemplateValue(r *alpha1.AirflowCluster, dependent []reconciler.Object, component string, label, selector, ports map[string]string) *common.TemplateValue {
b := k8s.GetItem(dependent, &alpha1.AirflowBase{}, r.Spec.AirflowBaseRef.Name, r.Namespace)
base := b.(*alpha1.AirflowBase)
return &common.TemplateValue{
Name: common.RsrcName(r.Name, component, ""),
Namespace: r.Namespace,
SecretName: common.RsrcName(r.Name, component, ""),
SvcName: common.RsrcName(r.Name, component, ""),
Cluster: r,
Base: base,
Labels: label,
Selector: selector,
Ports: ports,
}
}
func addAirflowContainers(r *alpha1.AirflowCluster, ss *appsv1.StatefulSet) {
if r.Spec.DAGs != nil {
init, dc := dagContainer(r.Spec.DAGs, "dags-data")
if init {
ss.Spec.Template.Spec.InitContainers = append(ss.Spec.Template.Spec.InitContainers, dc)
} else {
ss.Spec.Template.Spec.Containers = append(ss.Spec.Template.Spec.Containers, dc)
}
}
}
func addMySQLUserDBContainer(r *alpha1.AirflowCluster, ss *appsv1.StatefulSet) {
sqlRootSecret := common.RsrcName(r.Spec.AirflowBaseRef.Name, common.ValueAirflowComponentSQL, "")
sqlSvcName := common.RsrcName(r.Spec.AirflowBaseRef.Name, common.ValueAirflowComponentSQL, "")
sqlSecret := common.RsrcName(r.Name, common.ValueAirflowComponentUI, "")
env := []corev1.EnvVar{
{Name: "SQL_ROOT_PASSWORD", ValueFrom: envFromSecret(sqlRootSecret, "rootpassword")},
{Name: "SQL_DB", Value: r.Spec.Scheduler.DBName},
{Name: "SQL_USER", Value: r.Spec.Scheduler.DBUser},
{Name: "SQL_PASSWORD", ValueFrom: envFromSecret(sqlSecret, "password")},
{Name: "SQL_HOST", Value: sqlSvcName},
{Name: "DB_TYPE", Value: "mysql"},
}
containers := []corev1.Container{
{
Name: "mysql-dbcreate",
Image: alpha1.DefaultMySQLImage + ":" + alpha1.DefaultMySQLVersion,
Env: env,
Command: []string{"/bin/bash"},
//SET GLOBAL explicit_defaults_for_timestamp=ON;
Args: []string{"-c", `
mysql -uroot -h$(SQL_HOST) -p$(SQL_ROOT_PASSWORD) << EOSQL
CREATE DATABASE IF NOT EXISTS $(SQL_DB);
USE $(SQL_DB);
CREATE USER IF NOT EXISTS '$(SQL_USER)'@'%' IDENTIFIED BY '$(SQL_PASSWORD)';
GRANT ALL ON $(SQL_DB).* TO '$(SQL_USER)'@'%' ;
FLUSH PRIVILEGES;
SHOW GRANTS FOR $(SQL_USER);
EOSQL
`},
},
}
ss.Spec.Template.Spec.InitContainers = append(containers, ss.Spec.Template.Spec.InitContainers...)
}
func addPostgresUserDBContainer(r *alpha1.AirflowCluster, ss *appsv1.StatefulSet) {
sqlRootSecret := common.RsrcName(r.Spec.AirflowBaseRef.Name, common.ValueAirflowComponentSQL, "")
sqlSvcName := common.RsrcName(r.Spec.AirflowBaseRef.Name, common.ValueAirflowComponentSQL, "")
sqlSecret := common.RsrcName(r.Name, common.ValueAirflowComponentUI, "")
env := []corev1.EnvVar{
{Name: "SQL_ROOT_PASSWORD", ValueFrom: envFromSecret(sqlRootSecret, "rootpassword")},
{Name: "SQL_DB", Value: r.Spec.Scheduler.DBName},
{Name: "SQL_USER", Value: r.Spec.Scheduler.DBUser},
{Name: "SQL_PASSWORD", ValueFrom: envFromSecret(sqlSecret, "password")},
{Name: "SQL_HOST", Value: sqlSvcName},
{Name: "DB_TYPE", Value: "postgres"},
}
containers := []corev1.Container{
{
Name: "postgres-dbcreate",
Image: alpha1.DefaultPostgresImage + ":" + alpha1.DefaultPostgresVersion,
Env: env,
Command: []string{"/bin/bash"},
Args: []string{"-c", `
PGPASSWORD=$(SQL_ROOT_PASSWORD) psql -h $SQL_HOST -U postgres -tc "SELECT 1 FROM pg_database WHERE datname = '$(SQL_DB)'" | grep -q 1 || (PGPASSWORD=$(SQL_ROOT_PASSWORD) psql -h $SQL_HOST -U postgres -c "CREATE DATABASE $(SQL_DB)" &&
PGPASSWORD=$(SQL_ROOT_PASSWORD) psql -h $SQL_HOST -U postgres -c "CREATE USER $(SQL_USER) WITH ENCRYPTED PASSWORD '$(SQL_PASSWORD)'; GRANT ALL PRIVILEGES ON DATABASE $(SQL_DB) TO $(SQL_USER)")
`},
},
}
ss.Spec.Template.Spec.InitContainers = append(containers, ss.Spec.Template.Spec.InitContainers...)
}
func dependantResources(i interface{}) []reconciler.Object {
r := i.(*alpha1.AirflowCluster)
rsrc := []reconciler.Object{}
rsrc = append(rsrc, k8s.ReferredItem(&alpha1.AirflowBase{}, r.Spec.AirflowBaseRef.Name, r.Namespace))
return rsrc
}
func getAirflowPrometheusEnv(r *alpha1.AirflowCluster, base *alpha1.AirflowBase) []corev1.EnvVar {
sqlSvcName := common.RsrcName(r.Spec.AirflowBaseRef.Name, common.ValueAirflowComponentSQL, "")
sqlSecret := common.RsrcName(r.Name, common.ValueAirflowComponentUI, "")
ap := "AIRFLOW_PROMETHEUS_"
apd := ap + "DATABASE_"
backend := "mysql"
port := "3306"
if IsPostgres(&base.Spec) {
backend = "postgres"
port = "5432"
}
env := []corev1.EnvVar{
{Name: ap + "LISTEN_ADDR", Value: ":9112"},
{Name: apd + "BACKEND", Value: backend},
{Name: apd + "HOST", Value: sqlSvcName},
{Name: apd + "PORT", Value: port},
{Name: apd + "USER", Value: r.Spec.Scheduler.DBUser},
{Name: apd + "PASSWORD", ValueFrom: envFromSecret(sqlSecret, "password")},
{Name: apd + "NAME", Value: r.Spec.Scheduler.DBName},
}
return env
}
func getAirflowEnv(r *alpha1.AirflowCluster, saName string, base *alpha1.AirflowBase) []corev1.EnvVar {
sp := r.Spec
sqlSvcName := common.RsrcName(sp.AirflowBaseRef.Name, common.ValueAirflowComponentSQL, "")
sqlSecret := common.RsrcName(r.Name, common.ValueAirflowComponentUI, "")
schedulerConfigmap := common.RsrcName(r.Name, common.ValueAirflowComponentScheduler, "")
redisSecret := ""
redisSvcName := ""
if sp.MemoryStore == nil {
redisSecret = common.RsrcName(r.Name, common.ValueAirflowComponentRedis, "")
redisSvcName = redisSecret
}
dagFolder := airflowDagsBase
if sp.DAGs != nil {
if sp.DAGs.Git != nil {
dagFolder = airflowDagsBase + gitSyncDestDir + "/" + sp.DAGs.DagSubdir
} else if sp.DAGs.GCS != nil {
dagFolder = airflowDagsBase + gCSSyncDestDir + "/" + sp.DAGs.DagSubdir
}
}
dbType := "mysql"
if IsPostgres(&base.Spec) {
dbType = "postgres"
}
env := []corev1.EnvVar{
{Name: "EXECUTOR", Value: sp.Executor},
{Name: "SQL_PASSWORD", ValueFrom: envFromSecret(sqlSecret, "password")},
{Name: afc + "DAGS_FOLDER", Value: dagFolder},
{Name: "SQL_HOST", Value: sqlSvcName},
{Name: "SQL_USER", Value: sp.Scheduler.DBUser},
{Name: "SQL_DB", Value: sp.Scheduler.DBName},
{Name: "DB_TYPE", Value: dbType},
}
if sp.Executor == alpha1.ExecutorK8s {
env = append(env, []corev1.EnvVar{
{Name: afk + "AIRFLOW_CONFIGMAP", Value: schedulerConfigmap},
{Name: afk + "WORKER_CONTAINER_REPOSITORY", Value: sp.Worker.Image},
{Name: afk + "WORKER_CONTAINER_TAG", Value: sp.Worker.Version},
{Name: afk + "WORKER_CONTAINER_IMAGE_PULL_POLICY", Value: "IfNotPresent"},
{Name: afk + "DELETE_WORKER_PODS", Value: "True"},
{Name: afk + "NAMESPACE", Value: r.Namespace},
//{Name: afk+"IMAGE_PULL_SECRETS", Value: s.ImagePullSecrets},
//{Name: afk+"GCP_SERVICE_ACCOUNT_KEYS", Vaslue: ??},
}...)
if sp.DAGs != nil && sp.DAGs.Git != nil {
env = append(env, []corev1.EnvVar{
{Name: afk + "GIT_REPO", Value: sp.DAGs.Git.Repo},
{Name: afk + "GIT_BRANCH", Value: sp.DAGs.Git.Branch},
{Name: afk + "GIT_SUBPATH", Value: sp.DAGs.DagSubdir},
{Name: afk + "GIT_SYNC_DEST", Value: gitSyncDestDir},
{Name: afk + "WORKER_SERVICE_ACCOUNT_NAME", Value: saName},
{Name: afk + "GIT_DAGS_FOLDER_MOUNT_POINT", Value: airflowDagsBase},
// git_sync_root = /git
// git_sync_dest = repo
}...)
if sp.DAGs.Git.CredSecretRef != nil {
env = append(env, []corev1.EnvVar{
{Name: "GIT_PASSWORD",
ValueFrom: envFromSecret(sp.DAGs.Git.CredSecretRef.Name, "password")},
{Name: "GIT_USER", Value: sp.DAGs.Git.User},
}...)
}
}
// dags_in_image = False
// dags_volume_subpath =
// dags_volume_claim =
}
if sp.Executor == alpha1.ExecutorCelery {
if sp.MemoryStore != nil {
env = append(env,
[]corev1.EnvVar{
{Name: "REDIS_HOST", Value: sp.MemoryStore.Status.Host},
{Name: "REDIS_PORT", Value: strconv.FormatInt(sp.MemoryStore.Status.Port, 10)},
}...)
} else if r.Spec.Redis.RedisHost == "" {
env = append(env,
[]corev1.EnvVar{
{Name: "REDIS_PASSWORD",
ValueFrom: envFromSecret(redisSecret, "password")},
{Name: "REDIS_HOST", Value: redisSvcName},
}...)
} else {
env = append(env,
[]corev1.EnvVar{
{Name: "REDIS_HOST", Value: r.Spec.Redis.RedisHost},
{Name: "REDIS_PORT", Value: r.Spec.Redis.RedisPort},
}...)
if r.Spec.Redis.RedisPassword == true {
env = append(env,
[]corev1.EnvVar{
{Name: "REDIS_PASSWORD",
ValueFrom: envFromSecret(redisSecret, "password")},
}...)
}
}
}
// Do sorted key scan. To store the keys in slice in sorted order
var keys []string
for k := range sp.Config.AirflowEnv {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
env = append(env, corev1.EnvVar{Name: k, Value: sp.Config.AirflowEnv[k]})
}
for _, k := range sp.Config.AirflowSecretEnv {
env = append(env, corev1.EnvVar{Name: k.Env, ValueFrom: envFromSecret(k.Secret, k.Field)})
}
return env
}
// --------------- Global Cluster component -------------------------
// Observables asd
func (c *Cluster) Observables(rsrc interface{}, labels map[string]string, dependent []reconciler.Object) []reconciler.Observable {
return k8s.NewObservables().
WithLabels(labels).
For(&app.ApplicationList{}).
Get()
}
// DependentResources - return dependant resources
func (c *Cluster) DependentResources(rsrc interface{}) []reconciler.Object {
return dependantResources(rsrc)
}
// Objects returns the list of resource/name for those resources created by
// the operator for this spec and those resources referenced by this operator.
// Mark resources as owned, referred
func (c *Cluster) Objects(rsrc interface{}, rsrclabels map[string]string, observed, dependent, aggregated []reconciler.Object) ([]reconciler.Object, error) {
r := rsrc.(*alpha1.AirflowCluster)
selectors := make(map[string]string)
for k, v := range rsrclabels {
selectors[k] = v
}
delete(selectors, gr.LabelUsing)
ngdata := acTemplateValue(r, dependent, common.ValueAirflowComponentCluster, rsrclabels, selectors, nil)
ngdata.Expected = aggregated
return k8s.NewObjects().
WithValue(ngdata).
WithTemplate("cluster-application.yaml", &app.ApplicationList{},
func(o *reconciler.Object, v interface{}) {
ao := application.NewApplication(o.Obj.(*k8s.Object).Obj)
o = ao.SetSelector(r.Labels).
SetComponentGK(aggregated).
Item()
}).
Build()
}
// UpdateStatus use reconciled objects to update component status
func (c *Cluster) UpdateStatus(rsrc interface{}, reconciled []reconciler.Object, err error) time.Duration {
var period time.Duration
stts := &rsrc.(*alpha1.AirflowCluster).Status
ready := stts.ComponentMeta.UpdateStatus(reconciler.ObjectsByType(reconciled, k8s.Type))
stts.Meta.UpdateStatus(&ready, err)
return period
}
// ------------------------------ Airflow UI -----------------------------------
// Observables asd
func (s *UI) Observables(rsrc interface{}, labels map[string]string, dependent []reconciler.Object) []reconciler.Observable {
return k8s.NewObservables().
WithLabels(labels).
For(&appsv1.StatefulSetList{}).
For(&corev1.SecretList{}).
Get()
}
// DependentResources - return dependant resources
func (s *UI) DependentResources(rsrc interface{}) []reconciler.Object {
return dependantResources(rsrc)
}
// Objects returns the list of resource/name for those resources created by
func (s *UI) Objects(rsrc interface{}, rsrclabels map[string]string, observed, dependent, aggregated []reconciler.Object) ([]reconciler.Object, error) {
r := rsrc.(*alpha1.AirflowCluster)
if r.Spec.UI == nil {
return []reconciler.Object{}, nil
}
if r.Spec.MemoryStore != nil && r.Spec.MemoryStore.Status.Host == "" {
return []reconciler.Object{}, nil
}
ngdata := acTemplateValue(r, dependent, common.ValueAirflowComponentUI, rsrclabels, rsrclabels, map[string]string{"web": "8080"})
ngdata.Secret = map[string]string{
"password": base64.StdEncoding.EncodeToString(common.RandomAlphanumericString(16)),
}
return k8s.NewObjects().
WithValue(ngdata).
WithTemplate("ui-sts.yaml", &appsv1.StatefulSetList{}, s.sts).
WithTemplate("secret.yaml", &corev1.SecretList{}, reconciler.NoUpdate).
Build()
}
func (s *UI) sts(o *reconciler.Object, v interface{}) {
sts, r := updateSts(o, v)
sts.Spec.Template.Spec.Containers[0].Resources = r.Cluster.Spec.UI.Resources
if IsPostgres(&r.Base.Spec) {
addPostgresUserDBContainer(r.Cluster, sts)
} else {
addMySQLUserDBContainer(r.Cluster, sts)
}
}
// ------------------------------ RedisSpec ---------------------------------------
func (s Redis) sts(o *reconciler.Object, v interface{}) {
r := v.(*common.TemplateValue)
sts := o.Obj.(*k8s.Object).Obj.(*appsv1.StatefulSet)
sts.Spec.Template.Spec.Containers[0].Resources = r.Cluster.Spec.Redis.Resources
if r.Cluster.Spec.Redis.VolumeClaimTemplate != nil {
sts.Spec.VolumeClaimTemplates = []corev1.PersistentVolumeClaim{*r.Cluster.Spec.Redis.VolumeClaimTemplate}
}
}
// Observables asd
func (s *Redis) Observables(rsrc interface{}, labels map[string]string, dependent []reconciler.Object) []reconciler.Observable {
return k8s.NewObservables().
WithLabels(labels).
For(&appsv1.StatefulSetList{}).
For(&corev1.SecretList{}).
For(&policyv1.PodDisruptionBudgetList{}).
For(&corev1.ServiceList{}).
Get()
}
// DependentResources - return dependant resources
func (s *Redis) DependentResources(rsrc interface{}) []reconciler.Object {
return dependantResources(rsrc)
}
// Objects returns the list of resource/name for those resources created by
// the operator for this spec and those resources referenced by this operator.
// Mark resources as owned, referred
func (s *Redis) Objects(rsrc interface{}, rsrclabels map[string]string, observed, dependent, aggregated []reconciler.Object) ([]reconciler.Object, error) {
r := rsrc.(*alpha1.AirflowCluster)
if r.Spec.Redis == nil || r.Spec.Redis.RedisHost != "" {
return []reconciler.Object{}, nil
}
ngdata := acTemplateValue(r, dependent, common.ValueAirflowComponentRedis, rsrclabels, rsrclabels, map[string]string{"redis": "6379"})
ngdata.Secret = map[string]string{
"password": base64.StdEncoding.EncodeToString(common.RandomAlphanumericString(16)),
}
ngdata.PDBMinAvail = "100%"
return k8s.NewObjects().
WithValue(ngdata).
WithTemplate("redis-sts.yaml", &appsv1.StatefulSetList{}, s.sts).
WithTemplate("secret.yaml", &corev1.SecretList{}, reconciler.NoUpdate).
WithTemplate("pdb.yaml", &policyv1.PodDisruptionBudgetList{}).
WithTemplate("svc.yaml", &corev1.ServiceList{}).
Build()
}
// ------------------------------ Scheduler ---------------------------------------
func gcsContainer(s *alpha1.GCSSpec, volName string) (bool, corev1.Container) {
init := false
container := corev1.Container{}
env := []corev1.EnvVar{
{Name: "GCS_BUCKET", Value: s.Bucket},
}
if s.Once {
init = true
}
container = corev1.Container{
Name: "gcs-syncd",
Image: alpha1.GCSsyncImage + ":" + alpha1.GCSsyncVersion,
Env: env,
Args: []string{"/home/airflow/gcs"},
VolumeMounts: []corev1.VolumeMount{
{
Name: volName,
MountPath: "/home/airflow/gcs",
},
},
}
return init, container
}
func gitContainer(s *alpha1.GitSpec, volName string) (bool, corev1.Container) {
init := false
container := corev1.Container{}
env := []corev1.EnvVar{
{Name: "GIT_SYNC_REPO", Value: s.Repo},
{Name: "GIT_SYNC_DEST", Value: gitSyncDestDir},
{Name: "GIT_SYNC_BRANCH", Value: s.Branch},
{Name: "GIT_SYNC_ONE_TIME", Value: strconv.FormatBool(s.Once)},
{Name: "GIT_SYNC_REV", Value: s.Rev},
}
if s.CredSecretRef != nil {
env = append(env, []corev1.EnvVar{
{Name: "GIT_SYNC_PASSWORD",
ValueFrom: envFromSecret(s.CredSecretRef.Name, "password")},
{Name: "GIT_SYNC_USERNAME", Value: s.User},
}...)
}
if s.Once {
init = true
}
container = corev1.Container{
Name: "git-sync",
Image: alpha1.GitsyncImage + ":" + alpha1.GitsyncVersion,
Env: env,
Command: []string{"/git-sync"},
Ports: []corev1.ContainerPort{
{
Name: "gitsync",
ContainerPort: 2020,
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: volName,
MountPath: "/tmp/git",
},
},
}
return init, container
}
func dagContainer(s *alpha1.DagSpec, volName string) (bool, corev1.Container) {
init := false
container := corev1.Container{}
if s.Git != nil {
return gitContainer(s.Git, volName)
}
if s.GCS != nil {
return gcsContainer(s.GCS, volName)
}
return init, container
}
func (s *Scheduler) sts(o *reconciler.Object, v interface{}) {
sts, r := updateSts(o, v)
if r.Cluster.Spec.Executor == alpha1.ExecutorK8s {
sts.Spec.Template.Spec.ServiceAccountName = sts.Name
}
sts.Spec.Template.Spec.Containers[0].Resources = r.Cluster.Spec.Scheduler.Resources
sts.Spec.Template.Spec.Containers[1].Env = getAirflowPrometheusEnv(r.Cluster, r.Base)
}
// DependentResources - return dependant resources
func (s *Scheduler) DependentResources(rsrc interface{}) []reconciler.Object {
r := rsrc.(*alpha1.AirflowCluster)
resources := dependantResources(rsrc)
if r.Spec.Executor == alpha1.ExecutorK8s {
sqlSecret := common.RsrcName(r.Name, common.ValueAirflowComponentUI, "")
resources = append(resources, k8s.ReferredItem(&corev1.Secret{}, sqlSecret, r.Namespace))
}
return resources
}
// Observables - get
func (s *Scheduler) Observables(rsrc interface{}, labels map[string]string, dependent []reconciler.Object) []reconciler.Observable {
return k8s.NewObservables().
WithLabels(labels).
For(&appsv1.StatefulSetList{}).
For(&corev1.ConfigMapList{}).
For(&corev1.ServiceAccountList{}).
For(&rbacv1.RoleBindingList{}).
Get()
}
// Objects returns the list of resource/name for those resources created by
// the operator for this spec and those resources referenced by this operator.
// Mark resources as owned, referred
func (s *Scheduler) Objects(rsrc interface{}, rsrclabels map[string]string, observed, dependent, aggregated []reconciler.Object) ([]reconciler.Object, error) {
r := rsrc.(*alpha1.AirflowCluster)
if r.Spec.Scheduler == nil {
return []reconciler.Object{}, nil
}
if r.Spec.MemoryStore != nil && r.Spec.MemoryStore.Status.Host == "" {
return []reconciler.Object{}, nil
}
b := k8s.GetItem(dependent, &alpha1.AirflowBase{}, r.Spec.AirflowBaseRef.Name, r.Namespace)
base := b.(*alpha1.AirflowBase)
bag := k8s.NewObjects()
if r.Spec.DAGs != nil {
git := r.Spec.DAGs.Git
if git != nil && git.CredSecretRef != nil {
bag.WithReferredItem(&corev1.Secret{}, git.CredSecretRef.Name, r.Namespace)
}
}
ngdata := acTemplateValue(r, dependent, common.ValueAirflowComponentScheduler, rsrclabels, rsrclabels, nil)
bag.WithValue(ngdata).WithFolder("templates/")
if r.Spec.Executor == alpha1.ExecutorK8s {
sqlSvcName := common.RsrcName(r.Spec.AirflowBaseRef.Name, common.ValueAirflowComponentSQL, "")
sqlSecret := common.RsrcName(r.Name, common.ValueAirflowComponentUI, "")
se := k8s.GetItem(dependent, &corev1.Secret{}, sqlSecret, r.Namespace)
secret := se.(*corev1.Secret)
dbPrefix := "mysql"
port := "3306"
if base.Spec.Postgres != nil {
dbPrefix = "postgresql+psycopg2"
port = "5432"
}
conn := dbPrefix + "://" + r.Spec.Scheduler.DBUser + ":" + string(secret.Data["password"]) + "@" + sqlSvcName + ":" + port + "/" + r.Spec.Scheduler.DBName
ngdata.SQLConn = conn
bag.WithTemplate("airflow-configmap.yaml", &corev1.ConfigMapList{})
}
return bag.WithTemplate("scheduler-sts.yaml", &appsv1.StatefulSetList{}, s.sts).
WithTemplate("serviceaccount.yaml", &corev1.ServiceAccountList{}, reconciler.NoUpdate).
WithTemplate("rolebinding.yaml", &rbacv1.RoleBindingList{}).
Build()
}
// ------------------------------ Worker ----------------------------------------
func (s *Worker) sts(o *reconciler.Object, v interface{}) {
sts, r := updateSts(o, v)
sts.Spec.Template.Spec.Containers[0].Resources = r.Cluster.Spec.Worker.Resources
}
// Observables asd
func (s *Worker) Observables(rsrc interface{}, labels map[string]string, dependent []reconciler.Object) []reconciler.Observable {
return k8s.NewObservables().
WithLabels(labels).
For(&appsv1.StatefulSetList{}).
For(&corev1.ServiceList{}).
Get()
}
// DependentResources - return dependant resources
func (s *Worker) DependentResources(rsrc interface{}) []reconciler.Object {
return dependantResources(rsrc)
}
// Objects returns the list of resource/name for those resources created by
// the operator for this spec and those resources referenced by this operator.
// Mark resources as owned, referred
func (s *Worker) Objects(rsrc interface{}, rsrclabels map[string]string, observed, dependent, aggregated []reconciler.Object) ([]reconciler.Object, error) {
r := rsrc.(*alpha1.AirflowCluster)
if r.Spec.Worker == nil {
return []reconciler.Object{}, nil
}
if r.Spec.MemoryStore != nil && r.Spec.MemoryStore.Status.Host == "" {
return []reconciler.Object{}, nil
}
ngdata := acTemplateValue(r, dependent, common.ValueAirflowComponentWorker, rsrclabels, rsrclabels, map[string]string{"wlog": "8793"})
return k8s.NewObjects().
WithValue(ngdata).
WithTemplate("worker-sts.yaml", &appsv1.StatefulSetList{}, s.sts).
WithTemplate("headlesssvc.yaml", &corev1.ServiceList{}).
Build()
}
// ------------------------------ Flower ---------------------------------------
// Observables asd
func (s *Flower) Observables(rsrc interface{}, labels map[string]string, dependent []reconciler.Object) []reconciler.Observable {
return k8s.NewObservables().
WithLabels(labels).
For(&appsv1.StatefulSetList{}).
Get()
}
// DependentResources - return dependant resources
func (s *Flower) DependentResources(rsrc interface{}) []reconciler.Object {
return dependantResources(rsrc)
}
// Objects returns the list of resource/name for those resources created by
func (s *Flower) Objects(rsrc interface{}, rsrclabels map[string]string, observed, dependent, aggregated []reconciler.Object) ([]reconciler.Object, error) {
r := rsrc.(*alpha1.AirflowCluster)
if r.Spec.Flower == nil {
return []reconciler.Object{}, nil
}
if r.Spec.MemoryStore != nil && r.Spec.MemoryStore.Status.Host == "" {
return []reconciler.Object{}, nil
}
ngdata := acTemplateValue(r, dependent, common.ValueAirflowComponentFlower, rsrclabels, rsrclabels, map[string]string{"flower": "5555"})
return k8s.NewObjects().
WithValue(ngdata).
WithTemplate("flower-sts.yaml", &appsv1.StatefulSetList{}, s.sts).
Build()
}
func (s *Flower) sts(o *reconciler.Object, v interface{}) {
sts, r := updateSts(o, v)
sts.Spec.Template.Spec.Containers[0].Resources = r.Cluster.Spec.Flower.Resources
}
// ------------------------------ MemoryStore ---------------------------------------
// DependentResources - return dependant resources
func (s *MemoryStore) DependentResources(rsrc interface{}) []reconciler.Object {
return dependantResources(rsrc)
}
// Observables for memstore
func (s *MemoryStore) Observables(rsrc interface{}, labels map[string]string, dependent []reconciler.Object) []reconciler.Observable {
r := rsrc.(*alpha1.AirflowCluster)
if r.Spec.MemoryStore == nil {
return []reconciler.Observable{}
}
parent, err := redis.GetParent(r.Spec.MemoryStore.Project, r.Spec.MemoryStore.Region)
if err != nil {
return []reconciler.Observable{}
// TODO assert()
}
return []reconciler.Observable{redis.NewObservable(labels, parent)}
}
// Objects - returns resources
func (s *MemoryStore) Objects(rsrc interface{}, rsrclabels map[string]string, observed, dependent, aggregated []reconciler.Object) ([]reconciler.Object, error) {
r := rsrc.(*alpha1.AirflowCluster)
if r.Spec.MemoryStore == nil {
return []reconciler.Object{}, nil
}
parent, err := redis.GetParent(r.Spec.MemoryStore.Project, r.Spec.MemoryStore.Region)
if err != nil {
return []reconciler.Object{}, err
}
bag, err := gcp.NewObjects().
WithLabels(rsrclabels).
Add(redis.NewObject(parent, r.Name+"-redis")).
Build()
if err != nil {
return []reconciler.Object{}, err
}
robj := bag[0].Obj.(*redis.Object).Redis
robj.AlternativeLocationId = r.Spec.MemoryStore.AlternativeLocationID
robj.AuthorizedNetwork = r.Spec.MemoryStore.AuthorizedNetwork
robj.DisplayName = r.Name + "-redis"
if r.Spec.MemoryStore.NotifyKeyspaceEvents != "" {
if robj.RedisConfigs == nil {
robj.RedisConfigs = make(map[string]string)
}
robj.RedisConfigs["notify-keyspace-events"] = r.Spec.MemoryStore.NotifyKeyspaceEvents
}
if r.Spec.MemoryStore.MaxMemoryPolicy != "" {
if robj.RedisConfigs == nil {
robj.RedisConfigs = make(map[string]string)
}
robj.RedisConfigs["maxmemory-policy"] = r.Spec.MemoryStore.MaxMemoryPolicy
}
robj.RedisVersion = r.Spec.MemoryStore.RedisVersion
robj.MemorySizeGb = int64(r.Spec.MemoryStore.MemorySizeGb)
robj.Tier = strings.ToUpper(r.Spec.MemoryStore.Tier)
return bag, nil
}
// UpdateStatus - update status block
func (s *MemoryStore) UpdateStatus(rsrc interface{}, reconciled []reconciler.Object, err error) time.Duration {
var period time.Duration
r := rsrc.(*alpha1.AirflowCluster)
if r.Spec.MemoryStore == nil {
return period
}
stts := &r.Spec.MemoryStore.Status
ready := false
if len(reconciled) != 0 {
instance := reconciled[0].Obj.(*redis.Object).Redis
stts.CreateTime = instance.CreateTime
stts.CurrentLocationID = instance.CurrentLocationId
stts.Host = instance.Host
stts.Port = instance.Port
stts.State = instance.State
if instance.State != "READY" && instance.State != "MAINTENANCE" {
period = time.Second * 30
}
stts.StatusMessage = instance.StatusMessage
ready = true
stts.Meta.UpdateStatus(&ready, err)
} else {
period = time.Second * 30
stts.Meta.UpdateStatus(&ready, err)
}
return period
}
// Differs returns true if the resource needs to be updated
func (s *MemoryStore) Differs(expected reconciler.Object, observed reconciler.Object) bool {
return true //differs(expected, observed)
}
// Finalize - finalizes MemoryStore component when it is deleted
func (s *MemoryStore) Finalize(rsrc interface{}, observed, dependent []reconciler.Object) error {
r := rsrc.(*alpha1.AirflowCluster)
if r.Spec.MemoryStore == nil {
return nil
}
obj := r.Spec.MemoryStore
obj.Status.NotReady("Finalizing", "Finalizing in progress")
if len(observed) != 0 {
finalizer.Add(r, finalizer.Cleanup)
items := observed
for i := range items {
items[i].Delete = true
}
obj.Status.SetCondition(status.Cleanup, "InProgress", "Items pending deletion")
} else {
finalizer.Remove(r, finalizer.Cleanup)
}
return nil
}