blob: 3232341ba5b5983d12b2e2ece9e5a2f6e93ccc55 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v1alpha1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
"math/rand"
"sigs.k8s.io/controller-reconciler/pkg/finalizer"
"sigs.k8s.io/controller-reconciler/pkg/status"
"time"
)
// defaults and constant strings
const (
PasswordCharNumSpace = "abcdefghijklmnopqrstuvwxyz0123456789"
PasswordCharSpace = "abcdefghijklmnopqrstuvwxyz"
defaultRedisImage = "redis"
defaultRedisVersion = "4.0"
defaultRedisPort = "6379"
defaultWorkerImage = "gcr.io/airflow-operator/airflow"
defaultSchedulerImage = "gcr.io/airflow-operator/airflow"
defaultFlowerImage = "gcr.io/airflow-operator/airflow"
GitsyncImage = "gcr.io/google_containers/git-sync"
GitsyncVersion = "v3.0.1"
GCSsyncImage = "gcr.io/cloud-airflow-releaser/gcs-syncd"
GCSsyncVersion = "cloud_composer_service_2018-05-23-RC0"
ExecutorLocal = "Local"
ExecutorCelery = "Celery"
ExecutorSequential = "Sequential"
ExecutorK8s = "Kubernetes"
defaultExecutor = ExecutorLocal
defaultBranch = "master"
defaultWorkerVersion = "1.10.2"
defaultSchedulerVersion = "1.10.2"
)
var (
random = rand.New(rand.NewSource(time.Now().UnixNano()))
)
// RandomAlphanumericString generates a random password of some fixed length.
func RandomAlphanumericString(strlen int) []byte {
result := make([]byte, strlen)
for i := range result {
result[i] = PasswordCharNumSpace[random.Intn(len(PasswordCharNumSpace))]
}
result[0] = PasswordCharSpace[random.Intn(len(PasswordCharSpace))]
return result
}
var allowedExecutors = []string{ExecutorLocal, ExecutorSequential, ExecutorCelery, ExecutorK8s}
// MemoryStoreSpec defines the attributes and desired state of MemoryStore component
type MemoryStoreSpec struct {
// Project defines the SQL instance project
Project string `json:"project"`
// Region defines the SQL instance region
Region string `json:"region"`
// AlternativeLocationID - alt
// +optional.
AlternativeLocationID string `json:"alternativeLocationId,omitempty"`
// AuthorizedNetwork
// +optional.
AuthorizedNetwork string `json:"authorizedNetwork,omitempty"`
// LocationID The zone where the instance will be provisioned.
// +optional
LocationID string `json:"locationId,omitempty"`
// MemorySizeGb: Required. Redis memory size in GiB.
MemorySizeGb int64 `json:"memorySizeGb,omitempty"`
// RedisConfigs: Optional. Redis configuration parameters
RedisConfigs map[string]string `json:"redisConfigs,omitempty"`
// RedisVersion: Optional. The version of Redis software.
RedisVersion string `json:"redisVersion,omitempty"`
// Tier: Required. The service tier of the instance.
Tier string `json:"tier,omitempty"`
// Specifies the behavior Redis follows when the memory size limit is reached.
MaxMemoryPolicy string `json:"maxMemoryPolicy,omitempty"`
// Allows clients to subscribe to notifications on certain keyspace events
NotifyKeyspaceEvents string `json:"notifyKeyspaceEvents,omitempty"`
// Status
Status MemoryStoreStatus `json:"status,omitempty"`
}
func (s *MemoryStoreSpec) validate(fp *field.Path) field.ErrorList {
errs := field.ErrorList{}
if s == nil {
return errs
}
if s.Project == "" {
errs = append(errs, field.Required(fp.Child("project"), "Missing memoryStore Project"))
}
if s.Region == "" {
errs = append(errs, field.Required(fp.Child("region"), "Missing memoryStore Region"))
}
var allowedMaxMemoryPolicy = []string{"noeviction", "allkeys-lru", "volatile-lru", "allkeys-random", "volatile-random", "Volatile-ttl"}
allowedMMP := false
for _, policy := range allowedMaxMemoryPolicy {
if policy == s.MaxMemoryPolicy {
allowedMMP = true
}
}
if !allowedMMP {
errs = append(errs, field.Invalid(fp.Child("maxMemoryPolicy"), "", "Configuration is not allowed"))
}
var allowedNotifyKeyspaceEvents = []string{"", "K", "E", "g", "$", "l", "s", "h", "z", "x", "e", "A"}
allowedNKE := false
for _, event := range allowedNotifyKeyspaceEvents {
if event == s.NotifyKeyspaceEvents {
allowedNKE = true
}
}
if !allowedNKE {
errs = append(errs, field.Invalid(fp.Child("notifyKeyspaceEvent"), "", "Configuration is not allowed"))
}
return errs
}
// RedisSpec defines the attributes and desired state of Redis component
type RedisSpec struct {
// Image defines the Redis Docker image name
// +optional
Image string `json:"image,omitempty"`
// Version defines the Redis Docker image version.
// +optional
Version string `json:"version,omitempty"`
// Flag when True generates RedisReplica CustomResource to be handled by Redis Operator
// If False, a StatefulSet with 1 replica is created
// +optional
Operator bool `json:"operator,omitempty"`
// Hostname or IP of existing Redis instance
RedisHost string `json:"redisHost,omitempty"`
// Port of existing Redis instance
RedisPort string `json:"redisPort,omitempty"`
// If the existing Redis instance uses password or not, as MemoryStore doesn't support password yet
RedisPassword bool `json:"redisPassword,omitempty"`
// Resources is the resource requests and limits for the pods.
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
// VolumeClaimTemplate allows a user to specify volume claim for MySQL Server files
// +optional
VolumeClaimTemplate *corev1.PersistentVolumeClaim `json:"volumeClaimTemplate,omitempty"`
// AdditionalArgs for redis-server
// +optional
AdditionalArgs string `json:"additionalargs,omitempty"`
}
func (s *RedisSpec) validate(fp *field.Path) field.ErrorList {
errs := field.ErrorList{}
if s == nil {
return errs
}
if s.Operator == true {
errs = append(errs, field.Invalid(fp.Child("operator"), "", "Operator is not supported in this version"))
}
return errs
}
// FlowerSpec defines the attributes to deploy Flower component
type FlowerSpec struct {
// Image defines the Flower Docker image.
// +optional
Image string `json:"image,omitempty"`
// Version defines the Flower Docker image version.
// +optional
Version string `json:"version,omitempty"`
// Replicas defines the number of running Flower instances in a cluster
Replicas int32 `json:"replicas,omitempty"`
// Resources is the resource requests and limits for the pods.
// +optional
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
}
func (s *FlowerSpec) validate(fp *field.Path) field.ErrorList {
errs := field.ErrorList{}
return errs
}
// SchedulerSpec defines the attributes and desired state of Airflow Scheduler
type SchedulerSpec struct {
// Image defines the Airflow custom server Docker image.
// +optional
Image string `json:"image,omitempty"`
// Version defines the Airflow Docker image version
// +optional
Version string `json:"version,omitempty"`
// DBName defines the Airflow Database to be used
// +optional
DBName string `json:"database,omitempty"`
// DBUser defines the Airflow Database user to be used
// +optional
DBUser string `json:"dbuser,omitempty"`
// Resources is the resource requests and limits for the pods.
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
}
func (s *SchedulerSpec) validate(fp *field.Path) field.ErrorList {
return field.ErrorList{}
}
// WorkerSpec defines the attributes and desired state of Airflow workers
type WorkerSpec struct {
// Image defines the Airflow worker Docker image.
// +optional
Image string `json:"image,omitempty"`
// Version defines the Airflow worker Docker image version
// +optional
Version string `json:"version,omitempty"`
// Replicas is the count of number of workers
Replicas int32 `json:"replicas,omitempty"`
// Resources is the resource requests and limits for the pods.
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
}
func (s *WorkerSpec) validate(fp *field.Path) field.ErrorList {
return field.ErrorList{}
}
//GCSSpec defines the atributed needed to sync from a git repo
type GCSSpec struct {
// Bucket describes the GCS bucket
Bucket string `json:"bucket,omitempty"`
// Once syncs initially and quits (use init container instead of sidecar)
Once bool `json:"once,omitempty"`
}
func (s *GCSSpec) validate(fp *field.Path) field.ErrorList {
errs := field.ErrorList{}
if s == nil {
return errs
}
if s.Bucket == "" {
errs = append(errs, field.Required(fp.Child("bucket"), "bucket required"))
}
if s.Once == true {
errs = append(errs, field.NotSupported(fp.Child("once"), "true", []string{}))
}
return errs
}
//GitSpec defines the atributed needed to sync from a git repo
type GitSpec struct {
// Repo describes the http/ssh uri for git repo
Repo string `json:"repo,"`
// Branch describes the branch name to be synced
Branch string `json:"branch,omitempty"`
// Rev is the git hash to be used for syncing
Rev string `json:"rev,omitempty"`
// User for git access
User string `json:"user,omitempty"`
// Once syncs initially and quits (use init container instead of sidecar)
Once bool `json:"once,omitempty"`
// Reference to git credentials (user, password, ssh etc)
CredSecretRef *corev1.LocalObjectReference `json:"cred,omitempty"`
}
func (s *GitSpec) validate(fp *field.Path) field.ErrorList {
errs := field.ErrorList{}
if s == nil {
return errs
}
if s.Repo == "" {
errs = append(errs, field.Required(fp.Child("repo"), "repo required"))
}
if s.CredSecretRef != nil && s.CredSecretRef.Name == "" {
errs = append(errs, field.Required(fp.Child("cred", "name"), "name missing"))
}
//errs = append(errs, field.NotSupported(fp.Child("cred"), "", []string{}))
return errs
}
// DagSpec defines where the DAGs are located and how to access them
type DagSpec struct {
// DagSubdir is the directory under source where the dags are present
DagSubdir string `json:"subdir,omitempty"`
// GitSpec defines details to pull DAGs from a git repo using
// github.com/kubernetes/git-sync sidecar
Git *GitSpec `json:"git,omitempty"`
// NfsPVSpec
NfsPV *corev1.PersistentVolumeClaim `json:"nfspv,omitempty"`
// Storage has s3 compatible storage spec for copying files from
Storage *StorageSpec `json:"storage,omitempty"`
// Gcs config which uses storage spec
GCS *GCSSpec `json:"gcs,omitempty"`
}
func (s *DagSpec) validate(fp *field.Path) field.ErrorList {
errs := field.ErrorList{}
if s == nil {
return errs
}
if s.NfsPV != nil {
errs = append(errs, field.NotSupported(fp.Child("nfspv"), "", []string{}))
}
if s.Storage != nil {
errs = append(errs, field.NotSupported(fp.Child("storage"), "", []string{}))
}
errs = append(errs, s.Git.validate(fp.Child("git"))...)
errs = append(errs, s.GCS.validate(fp.Child("git"))...)
return errs
}
// SecretEnv secret env
type SecretEnv struct {
// Env - env variable name
Env string `json:"env,omitempty"`
// Secret - secret name
Secret string `json:"secret,omitempty"`
// Field - field name
Field string `json:"field,omitempty"`
}
// ClusterConfig is used to capture the config for Airflow
type ClusterConfig struct {
// Airflow defines a list of kv pairs that describe env variables injected into the nodes
// +optional
AirflowEnv map[string]string `json:"airflow,omitempty"`
// AirflowSecret defines a list of secret envs
// +optional
AirflowSecretEnv []SecretEnv `json:"airflowsecret,omitempty"`
}
// AirflowClusterSpec defines the desired state of AirflowCluster
type AirflowClusterSpec struct {
// Selector for fitting pods to nodes whose labels match the selector.
// https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
// +optional
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
// Define scheduling constraints for pods.
// +optional
Affinity *corev1.Affinity `json:"affinity,omitempty"`
// Custom annotations to be added to the pods.
// +optional
Annotations map[string]string `json:"annotations,omitempty"`
// Custom labels to be added to the pods.
// +optional
Labels map[string]string `json:"labels,omitempty"`
// Airflow Executor desired: local,celery,kubernetes
// +optional
Executor string `json:"executor,omitempty"`
// Airflow config as env list
// +optional
Config ClusterConfig `json:"config,omitempty"`
// Spec for MemoryStore component
// +optional
MemoryStore *MemoryStoreSpec `json:"memoryStore,omitempty"`
// Spec for Redis component.
// +optional
Redis *RedisSpec `json:"redis,omitempty"`
// Spec for Airflow Scheduler component.
// +optional
Scheduler *SchedulerSpec `json:"scheduler,omitempty"`
// Spec for Airflow Workers
// +optional
Worker *WorkerSpec `json:"worker,omitempty"`
// Spec for Airflow UI component.
// +optional
UI *AirflowUISpec `json:"ui,omitempty"`
// Spec for Flower component.
// +optional
Flower *FlowerSpec `json:"flower,omitempty"`
// Spec for DAG source and location
// +optional
DAGs *DagSpec `json:"dags,omitempty"`
// AirflowBaseRef is a reference to the AirflowBase CR
AirflowBaseRef *corev1.LocalObjectReference `json:"airflowbase,omitempty"`
}
// SchedulerStatus defines the observed state of Airflow Scheduler
type SchedulerStatus struct {
// DagCount is a count of number of Dags observed
DagCount int32 `json:"dagcount,omitempty"`
// RunCount is a count of number of Dag Runs observed
RunCount int32 `json:"runcount,omitempty"`
}
// MemoryStoreStatus defines the observed state of MemoryStore
type MemoryStoreStatus struct {
// CreateTime: Output only. The time the instance was created.
CreateTime string `json:"createTime,omitempty"`
// CurrentLocationID: Output only. The current zone where the Redis
// endpoint is placed.
CurrentLocationID string `json:"currentLocationId,omitempty"`
// StatusMessage: Output only. Additional information about the current
// status of this instance, if available.
StatusMessage string `json:"statusMessage,omitempty"`
// Host: Output only. Hostname or IP address of the exposed Redis endpoint used by
// clients to connect to the service.
Host string `json:"host,omitempty"`
// Port: Output only. The port number of the exposed Redis endpoint.
Port int64 `json:"port,omitempty"`
// State: Output only. The current state of this instance.
State string `json:"state,omitempty"`
status.Meta `json:",inline"`
status.ComponentMeta `json:",inline"`
}
// AirflowClusterStatus defines the observed state of AirflowCluster
type AirflowClusterStatus struct {
status.Meta `json:",inline"`
status.ComponentMeta `json:",inline"`
}
// AirflowCluster represents the Airflow Scheduler and workers for a single DAG folder
// function. At a minimum they need a SQL service (MySQL or SQLProxy) and Airflow UI.
// In addition for an installation with minimal external dependencies, NFS and Airflow UI
// are also added.
// +kubebuilder:object:root=true
type AirflowCluster struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec AirflowClusterSpec `json:"spec,omitempty"`
Status AirflowClusterStatus `json:"status,omitempty"`
}
// Helper functions for the resources
// ApplyDefaults the AirflowCluster
func (b *AirflowCluster) ApplyDefaults() {
if b.Spec.Redis != nil {
if b.Spec.Redis.Image == "" {
b.Spec.Redis.Image = defaultRedisImage
}
if b.Spec.Redis.Version == "" {
b.Spec.Redis.Version = defaultRedisVersion
}
if b.Spec.Redis.RedisHost == "" {
if b.Spec.Redis.Image == "" {
b.Spec.Redis.Image = defaultRedisImage
}
if b.Spec.Redis.Version == "" {
b.Spec.Redis.Version = defaultRedisVersion
}
} else if b.Spec.Redis.RedisPort == "" {
b.Spec.Redis.RedisPort = defaultRedisPort
}
}
if b.Spec.Scheduler != nil {
if b.Spec.Scheduler.Image == "" {
b.Spec.Scheduler.Image = defaultSchedulerImage
}
if b.Spec.Scheduler.Version == "" {
b.Spec.Scheduler.Version = defaultSchedulerVersion
}
if b.Spec.Scheduler.DBName == "" {
b.Spec.Scheduler.DBName = string(RandomAlphanumericString(16))
}
if b.Spec.Scheduler.DBUser == "" {
b.Spec.Scheduler.DBUser = string(RandomAlphanumericString(16))
}
}
if b.Spec.UI != nil {
if b.Spec.UI.Image == "" {
b.Spec.UI.Image = defaultUIImage
}
if b.Spec.UI.Version == "" {
b.Spec.UI.Version = defaultUIVersion
}
if b.Spec.UI.Replicas == 0 {
b.Spec.UI.Replicas = 1
}
}
if b.Spec.Flower != nil {
if b.Spec.Flower.Image == "" {
b.Spec.Flower.Image = defaultFlowerImage
}
if b.Spec.Flower.Version == "" {
b.Spec.Flower.Version = defaultFlowerVersion
}
if b.Spec.Flower.Replicas == 0 {
b.Spec.Flower.Replicas = 1
}
}
if b.Spec.Executor == "" {
b.Spec.Executor = defaultExecutor
}
if b.Spec.Worker != nil {
if b.Spec.Worker.Image == "" {
b.Spec.Worker.Image = defaultWorkerImage
}
if b.Spec.Worker.Version == "" {
b.Spec.Worker.Version = defaultWorkerVersion
}
if b.Spec.Worker.Replicas == 0 {
b.Spec.Worker.Replicas = 1
}
if b.Spec.Executor == ExecutorK8s {
b.Spec.Worker.Replicas = 0
}
}
if b.Spec.DAGs != nil {
if b.Spec.DAGs.Git != nil {
if b.Spec.DAGs.Git.Branch == "" {
b.Spec.DAGs.Git.Branch = defaultBranch
}
}
}
b.Status.ComponentList = status.ComponentList{}
finalizer.EnsureStandard(b)
}
// Validate the AirflowCluster
func (b *AirflowCluster) Validate() error {
errs := field.ErrorList{}
spec := field.NewPath("spec")
errs = append(errs, b.Spec.MemoryStore.validate(spec.Child("memorystore"))...)
errs = append(errs, b.Spec.Redis.validate(spec.Child("redis"))...)
errs = append(errs, b.Spec.Scheduler.validate(spec.Child("scheduler"))...)
errs = append(errs, b.Spec.Worker.validate(spec.Child("worker"))...)
errs = append(errs, b.Spec.DAGs.validate(spec.Child("dags"))...)
errs = append(errs, b.Spec.UI.validate(spec.Child("ui"))...)
errs = append(errs, b.Spec.Flower.validate(spec.Child("flower"))...)
allowed := false
for _, executor := range allowedExecutors {
if executor == b.Spec.Executor {
allowed = true
}
}
if !allowed {
errs = append(errs, field.NotSupported(spec.Child("executor"), b.Spec.Executor, allowedExecutors))
}
if b.Spec.Scheduler == nil {
errs = append(errs, field.Required(spec.Child("scheduler"), "scheduler required"))
}
if b.Spec.Executor == ExecutorCelery {
if b.Spec.Redis == nil && b.Spec.MemoryStore == nil {
errs = append(errs, field.Required(spec.Child("redis"), "redis/memoryStore required for Celery executor"))
}
if b.Spec.Worker == nil {
errs = append(errs, field.Required(spec.Child("worker"), "worker required for Celery executor"))
}
}
if b.Spec.Executor == ExecutorK8s {
if b.Spec.Worker == nil {
errs = append(errs, field.Required(spec.Child("worker"), "worker required for Celery executor"))
}
}
if b.Spec.Flower != nil {
if b.Spec.Executor != ExecutorCelery {
errs = append(errs, field.Required(spec.Child("executor"), "celery executor required for Flower"))
}
}
if b.Spec.AirflowBaseRef == nil {
errs = append(errs, field.Required(spec.Child("airflowbase"), "airflowbase reference missing"))
} else if b.Spec.AirflowBaseRef.Name == "" {
errs = append(errs, field.Required(spec.Child("airflowbase", "name"), "name missing"))
}
return errs.ToAggregate()
}
// NewAirflowCluster return a defaults filled AirflowCluster object
func NewAirflowCluster(name, namespace, executor, base string, dags *DagSpec) *AirflowCluster {
c := AirflowCluster{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{},
Namespace: namespace,
},
}
c.Spec = AirflowClusterSpec{}
c.Spec.Executor = executor
c.Spec.Scheduler = &SchedulerSpec{}
c.Spec.UI = &AirflowUISpec{}
if executor == ExecutorCelery {
c.Spec.Redis = &RedisSpec{}
c.Spec.MemoryStore = &MemoryStoreSpec{}
c.Spec.Redis = &RedisSpec{}
c.Spec.Worker = &WorkerSpec{}
c.Spec.Flower = &FlowerSpec{}
}
c.Spec.DAGs = dags
c.Spec.AirflowBaseRef = &corev1.LocalObjectReference{Name: base}
c.ApplyDefaults()
return &c
}
// AirflowClusterList contains a list of AirflowCluster
// +kubebuilder:object:root=true
type AirflowClusterList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []AirflowCluster `json:"items"`
}
func init() {
SchemeBuilder.Register(&AirflowCluster{}, &AirflowClusterList{})
}