blob: d451a095d647b8cfa48e1afc8215196068c1834d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controllers
// Major:
// TODO retry.Retry
//
// Minor:
// TODO reconcile based on hash(spec)
// TODO validation: assume resources and volume claims are validated by api server ?
// TODO parameterize controller using config maps for default images, versions, resources etc
// TODO documentation for CRD spec
import (
"context"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"encoding/base64"
alpha1 "github.com/apache/airflow-on-k8s-operator/api/v1alpha1"
"github.com/apache/airflow-on-k8s-operator/controllers/application"
"github.com/apache/airflow-on-k8s-operator/controllers/common"
app "github.com/kubernetes-sigs/application/pkg/apis/app/v1beta1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1beta1"
gr "sigs.k8s.io/controller-reconciler/pkg/genericreconciler"
"sigs.k8s.io/controller-reconciler/pkg/reconciler"
"sigs.k8s.io/controller-reconciler/pkg/reconciler/manager/k8s"
"sigs.k8s.io/controller-runtime/pkg/manager"
"time"
)
// AirflowBaseReconciler reconciles a AirflowBase object
type AirflowBaseReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=airflow.apache.org,resources=airflowbases,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=airflow.apache.org,resources=airflowbases/status,verbs=get;update;patch
// Reconcile - Dummy TODO remove this
func (r *AirflowBaseReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
_ = context.Background()
_ = r.Log.WithValues("airflowbase", req.NamespacedName)
// your logic here
return ctrl.Result{}, nil
}
// SetupWithManager - called by main
func (r *AirflowBaseReconciler) SetupWithManager(mgr ctrl.Manager) error {
_ = app.AddToScheme(r.Scheme)
return ctrl.NewControllerManagedBy(mgr).
For(&alpha1.AirflowBase{}).
Complete(abReconciler(mgr))
}
func abReconciler(mgr manager.Manager) *gr.Reconciler {
return gr.
WithManager(mgr).
For(&alpha1.AirflowBase{}, alpha1.GroupVersion).
Using(&MySQL{}).
Using(&Postgres{}).
Using(&SQLProxy{}).
Using(&NFS{}).
Using(&AirflowBase{}).
WithErrorHandler(abHandleError).
WithValidator(validate).
WithDefaulter(applyDefaults).
Build()
}
func abHandleError(resource interface{}, err error, kind string) {
ab := resource.(*alpha1.AirflowBase)
if err != nil {
ab.Status.SetError("ErrorSeen", err.Error())
} else {
ab.Status.ClearError()
}
}
func validate(resource interface{}) error {
ab := resource.(*alpha1.AirflowBase)
return ab.Validate()
}
func applyDefaults(resource interface{}) {
ab := resource.(*alpha1.AirflowBase)
ab.ApplyDefaults()
}
// AirflowBase - interface to handle airflowbase
type AirflowBase struct{}
// MySQL - interface to handle redis
type MySQL struct{}
// Postgres - interface to handle flower
type Postgres struct{}
// SQLProxy - interface to handle scheduler
type SQLProxy struct{}
// NFS - interface to handle worker
type NFS struct{}
// =-------------------------- common ------------------------------------
func templateValue(r *alpha1.AirflowBase, component, altcomponent string, label, selector, ports map[string]string) *common.TemplateValue {
if altcomponent == "" {
altcomponent = component
}
return &common.TemplateValue{
Name: common.RsrcName(r.Name, component, ""),
Namespace: r.Namespace,
SecretName: common.RsrcName(r.Name, altcomponent, ""),
SvcName: common.RsrcName(r.Name, altcomponent, ""),
Base: r,
Labels: label,
Selector: selector,
Ports: ports,
}
}
// updateStatus use reconciled objects to update component status
func updateStatus(rsrc interface{}, reconciled []reconciler.Object, err error) time.Duration {
var period time.Duration
stts := &rsrc.(*alpha1.AirflowBase).Status
ready := stts.ComponentMeta.UpdateStatus(reconciler.ObjectsByType(reconciled, k8s.Type))
stts.Meta.UpdateStatus(&ready, err)
return period
}
// ------------------------------ MYSQL ---------------------------------------
func (s *MySQL) sts(o *reconciler.Object, v interface{}) {
r := v.(*common.TemplateValue)
sts := o.Obj.(*k8s.Object).Obj.(*appsv1.StatefulSet)
sts.Spec.Template.Spec.Containers[0].Resources = r.Base.Spec.MySQL.Resources
if r.Base.Spec.MySQL.VolumeClaimTemplate != nil {
sts.Spec.VolumeClaimTemplates = []corev1.PersistentVolumeClaim{*r.Base.Spec.MySQL.VolumeClaimTemplate}
}
}
// Observables asd
func (s *MySQL) Observables(rsrc interface{}, labels map[string]string, dependent []reconciler.Object) []reconciler.Observable {
return k8s.NewObservables().
WithLabels(labels).
For(&appsv1.StatefulSetList{}).
For(&corev1.SecretList{}).
For(&policyv1.PodDisruptionBudgetList{}).
For(&corev1.ServiceList{}).
Get()
}
// Objects returns the list of resource/name for those resources created by
// the operator for this spec and those resources referenced by this operator.
// Mark resources as owned, referred
func (s *MySQL) Objects(rsrc interface{}, rsrclabels map[string]string, observed, dependent, aggregated []reconciler.Object) ([]reconciler.Object, error) {
r := rsrc.(*alpha1.AirflowBase)
if r.Spec.MySQL == nil {
return []reconciler.Object{}, nil
}
ngdata := templateValue(r, common.ValueAirflowComponentMySQL, common.ValueAirflowComponentSQL, rsrclabels, rsrclabels, map[string]string{"mysql": "3306"})
ngdata.Secret = map[string]string{
"password": base64.StdEncoding.EncodeToString(common.RandomAlphanumericString(16)),
"rootpassword": base64.StdEncoding.EncodeToString(common.RandomAlphanumericString(16)),
}
ngdata.PDBMinAvail = "100%"
return k8s.NewObjects().
WithValue(ngdata).
WithTemplate("mysql-sts.yaml", &appsv1.StatefulSetList{}, s.sts).
WithTemplate("secret.yaml", &corev1.SecretList{}, reconciler.NoUpdate).
WithTemplate("pdb.yaml", &policyv1.PodDisruptionBudgetList{}).
WithTemplate("svc.yaml", &corev1.ServiceList{}).
Build()
}
// UpdateStatus use reconciled objects to update component status
func (s *MySQL) UpdateStatus(rsrc interface{}, reconciled []reconciler.Object, err error) time.Duration {
return updateStatus(rsrc, reconciled, err)
}
// ------------------------------ POSTGRES ---------------------------------------
func (s *Postgres) sts(o *reconciler.Object, v interface{}) {
r := v.(*common.TemplateValue)
sts := o.Obj.(*k8s.Object).Obj.(*appsv1.StatefulSet)
sts.Spec.Template.Spec.Containers[0].Resources = r.Base.Spec.Postgres.Resources
if r.Base.Spec.Postgres.VolumeClaimTemplate != nil {
sts.Spec.VolumeClaimTemplates = []corev1.PersistentVolumeClaim{*r.Base.Spec.Postgres.VolumeClaimTemplate}
}
}
// Observables asd
func (s *Postgres) Observables(rsrc interface{}, labels map[string]string, dependent []reconciler.Object) []reconciler.Observable {
return k8s.NewObservables().
WithLabels(labels).
For(&appsv1.StatefulSetList{}).
For(&corev1.SecretList{}).
For(&policyv1.PodDisruptionBudgetList{}).
For(&corev1.ServiceList{}).
Get()
}
// Objects returns the list of resource/name for those resources created by
// the operator for this spec and those resources referenced by this operator.
// Mark resources as owned, referred
func (s *Postgres) Objects(rsrc interface{}, rsrclabels map[string]string, observed, dependent, aggregated []reconciler.Object) ([]reconciler.Object, error) {
r := rsrc.(*alpha1.AirflowBase)
if r.Spec.Postgres == nil {
return []reconciler.Object{}, nil
}
ngdata := templateValue(r, common.ValueAirflowComponentPostgres, common.ValueAirflowComponentSQL, rsrclabels, rsrclabels, map[string]string{"postgres": "5432"})
ngdata.Secret = map[string]string{
"password": base64.StdEncoding.EncodeToString(common.RandomAlphanumericString(16)),
"rootpassword": base64.StdEncoding.EncodeToString(common.RandomAlphanumericString(16)),
}
ngdata.PDBMinAvail = "100%"
return k8s.NewObjects().
WithValue(ngdata).
WithTemplate("postgres-sts.yaml", &appsv1.StatefulSetList{}, s.sts).
WithTemplate("secret.yaml", &corev1.SecretList{}, reconciler.NoUpdate).
WithTemplate("pdb.yaml", &policyv1.PodDisruptionBudgetList{}).
WithTemplate("svc.yaml", &corev1.ServiceList{}).
Build()
}
// UpdateStatus use reconciled objects to update component status
func (s *Postgres) UpdateStatus(rsrc interface{}, reconciled []reconciler.Object, err error) time.Duration {
return updateStatus(rsrc, reconciled, err)
}
// ------------------------------ NFSStoreSpec ---------------------------------------
// Observables asd
func (s *NFS) Observables(rsrc interface{}, labels map[string]string, dependent []reconciler.Object) []reconciler.Observable {
return k8s.NewObservables().
WithLabels(labels).
For(&appsv1.StatefulSetList{}).
For(&policyv1.PodDisruptionBudgetList{}).
For(&corev1.ServiceList{}).
Get()
}
// Objects returns the list of resource/name for those resources created by
func (s *NFS) Objects(rsrc interface{}, rsrclabels map[string]string, observed, dependent, aggregated []reconciler.Object) ([]reconciler.Object, error) {
r := rsrc.(*alpha1.AirflowBase)
if r.Spec.Storage == nil {
return []reconciler.Object{}, nil
}
ngdata := templateValue(r, common.ValueAirflowComponentNFS, "", rsrclabels, rsrclabels, map[string]string{"nfs": "2049", "mountd": "20048", "rpcbind": "111"})
ngdata.PDBMinAvail = "100%"
return k8s.NewObjects().
WithValue(ngdata).
WithTemplate("nfs-sts.yaml", &appsv1.StatefulSetList{}, s.sts).
WithTemplate("pdb.yaml", &policyv1.PodDisruptionBudgetList{}).
WithTemplate("svc.yaml", &corev1.ServiceList{}).
Build()
}
func (s *NFS) sts(o *reconciler.Object, v interface{}) {
r := v.(*common.TemplateValue)
sts := o.Obj.(*k8s.Object).Obj.(*appsv1.StatefulSet)
sts.Spec.Template.Spec.Containers[0].Resources = r.Base.Spec.Storage.Resources
if r.Base.Spec.Storage.Volume != nil {
sts.Spec.VolumeClaimTemplates = []corev1.PersistentVolumeClaim{*r.Base.Spec.Storage.Volume}
}
}
// UpdateStatus use reconciled objects to update component status
func (s *NFS) UpdateStatus(rsrc interface{}, reconciled []reconciler.Object, err error) time.Duration {
return updateStatus(rsrc, reconciled, err)
}
// ------------------------------ SQLProxy ---------------------------------------
// Observables asd
func (s *SQLProxy) Observables(rsrc interface{}, labels map[string]string, dependent []reconciler.Object) []reconciler.Observable {
return k8s.NewObservables().
WithLabels(labels).
For(&appsv1.StatefulSetList{}).
For(&corev1.ServiceList{}).
Get()
}
// Objects returns the list of resource/name for those resources created by
// the operator for this spec and those resources referenced by this operator.
// Mark resources as owned, referred
func (s *SQLProxy) Objects(rsrc interface{}, rsrclabels map[string]string, observed, dependent, aggregated []reconciler.Object) ([]reconciler.Object, error) {
r := rsrc.(*alpha1.AirflowBase)
if r.Spec.SQLProxy == nil {
return []reconciler.Object{}, nil
}
sqlname := common.RsrcName(r.Name, common.ValueAirflowComponentSQL, "")
port := "3306"
if r.Spec.SQLProxy.Type == common.ValueSQLProxyTypePostgres {
port = "5432"
}
ngdata := templateValue(r, common.ValueAirflowComponentSQLProxy, "", rsrclabels, rsrclabels, map[string]string{"sqlproxy": port})
ngdata.SvcName = sqlname
return k8s.NewObjects().
WithValue(ngdata).
WithFolder("templates/").
WithTemplate("sqlproxy-sts.yaml", &appsv1.StatefulSetList{}).
WithTemplate("svc.yaml", &corev1.ServiceList{}).
WithReferredItem(&corev1.Secret{}, sqlname, r.Namespace).
Build()
}
// UpdateStatus use reconciled objects to update component status
func (s *SQLProxy) UpdateStatus(rsrc interface{}, reconciled []reconciler.Object, err error) time.Duration {
return updateStatus(rsrc, reconciled, err)
}
// ---------------- Global AirflowBase component -------------------------
// Observables asd
func (s *AirflowBase) Observables(rsrc interface{}, labels map[string]string, dependent []reconciler.Object) []reconciler.Observable {
return k8s.NewObservables().
WithLabels(labels).
For(&app.ApplicationList{}).
Get()
}
// Objects returns the list of resource/name for those resources created by
// the operator for this spec and those resources referenced by this operator.
// Mark resources as owned, referred
func (s *AirflowBase) Objects(rsrc interface{}, rsrclabels map[string]string, observed, dependent, aggregated []reconciler.Object) ([]reconciler.Object, error) {
r := rsrc.(*alpha1.AirflowBase)
selectors := make(map[string]string)
for k, v := range rsrclabels {
selectors[k] = v
}
delete(selectors, gr.LabelUsing)
ngdata := templateValue(r, common.ValueAirflowComponentBase, "", rsrclabels, selectors, nil)
ngdata.Expected = aggregated
return k8s.NewObjects().
WithValue(ngdata).
WithTemplate("base-application.yaml", &app.ApplicationList{},
func(o *reconciler.Object, v interface{}) {
ao := application.NewApplication(o.Obj.(*k8s.Object).Obj)
o = ao.SetSelector(r.Labels).
SetComponentGK(aggregated).
Item()
}).
Build()
}
// UpdateStatus use reconciled objects to update component status
func (s *AirflowBase) UpdateStatus(rsrc interface{}, reconciled []reconciler.Object, err error) time.Duration {
return updateStatus(rsrc, reconciled, err)
}