blob: 8bda945bfc10cf35d67de8a0fcbece4650b72010 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package controllers
import (
"context"
"k8s.io/api/autoscaling/v2beta2"
v1 "k8s.io/api/rbac/v1"
"time"
dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
)
var (
masterLogger = ctrl.Log.WithName("DSMaster-controller")
)
// DSMasterReconciler reconciles a DSMaster object
type DSMasterReconciler struct {
client.Client
Scheme *runtime.Scheme
recorder record.EventRecorder
}
//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsmasters,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsmasters/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsmasters/finalizers,verbs=update
//+kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;create;delete;list;watch
//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=serviceaccounts,verbs=get;list;create;delete
//+kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=role,verbs=get;list;create;delete
//+kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=rolebinding,verbs=get;list;create;delete
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// the DSMaster object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile
func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
masterLogger.Info("dmMaster start reconcile logic")
defer masterLogger.Info("dmMaster Reconcile end ---------------------------------------------")
cluster := &dsv1alpha1.DSMaster{}
if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
desired := cluster.DeepCopy()
sa := &corev1.ServiceAccount{}
saReq := ctrl.Request{
NamespacedName: types.NamespacedName{
Namespace: req.Namespace,
Name: dsv1alpha1.DsServiceAccount,
},
}
err := r.Get(ctx, saReq.NamespacedName, sa)
if apierrors.IsNotFound(err) {
err := r.createServiceAccountIfNotExists(ctx, cluster)
if err != nil {
return ctrl.Result{}, err
}
}
// Handler finalizer
// examine DeletionTimestamp to determine if object is under deletion
if cluster.ObjectMeta.DeletionTimestamp.IsZero() {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object. This is equivalent
// registering our finalizer.
if !controllerutil.ContainsFinalizer(desired, dsv1alpha1.FinalizerName) {
controllerutil.AddFinalizer(desired, dsv1alpha1.FinalizerName)
if err := r.Update(ctx, desired); err != nil {
return ctrl.Result{}, err
}
}
} else {
// The object is being deleted
if controllerutil.ContainsFinalizer(desired, dsv1alpha1.FinalizerName) {
// our finalizer is present, so lets handle any external dependency
if err := r.ensureDSMasterDeleted(ctx, cluster); err != nil {
return ctrl.Result{}, err
}
// remove our finalizer from the list and update it.
controllerutil.RemoveFinalizer(desired, dsv1alpha1.FinalizerName)
if err := r.Update(ctx, desired); err != nil {
return ctrl.Result{}, err
}
}
// Stop reconciliation as the item is being deleted
return ctrl.Result{}, nil
}
// If dsmaster-cluster is paused, we do nothing on things changed.
// Until dsmaster-cluster is un-paused, we will reconcile to the state of that point.
if cluster.Spec.Paused {
masterLogger.Info("ds-master control has been paused: ", "ds-master-name", cluster.Name)
desired.Status.ControlPaused = true
if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
} else {
masterLogger.Error(err, "unexpected error when master update status in paused")
return ctrl.Result{}, err
}
}
r.recorder.Event(cluster, corev1.EventTypeNormal, "the master spec status is paused", "do nothing")
return ctrl.Result{}, nil
}
// 1. First time we see the ds-master-cluster, initialize it
if cluster.Status.Phase == dsv1alpha1.DsPhaseNone {
desired.Status.Phase = dsv1alpha1.DsPhaseCreating
masterLogger.Info("phase had been changed from none ---> creating")
if err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err
} else {
masterLogger.Error(err, "unexpected error when master update status in creating")
return ctrl.Result{}, err
}
}
}
//2 ensure the headless service
masterLogger.Info("Ensuring cluster service")
if err := r.ensureMasterService(ctx, cluster); err != nil {
return ctrl.Result{}, err
}
//2 ensure the headless service
masterLogger.Info("Ensuring worker hpa")
if err := r.ensureHPA(ctx, cluster); err != nil {
return ctrl.Result{}, err
}
// 4. Ensure bootstrapped, we will block here util cluster is up and healthy
masterLogger.Info("Ensuring cluster members")
if requeue, err := r.ensureMembers(ctx, cluster); requeue {
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
}
// 5. Ensure cluster scaled
masterLogger.Info("Ensuring cluster scaled")
if requeue, err := r.ensureScaled(ctx, cluster); requeue {
return ctrl.Result{Requeue: true, RequeueAfter: 5 * time.Second}, err
}
// 6. Ensure cluster upgraded
masterLogger.Info("Ensuring cluster upgraded")
if requeue, err := r.ensureUpgraded(ctx, cluster); requeue {
return ctrl.Result{Requeue: true}, err
}
desired.Status.Phase = dsv1alpha1.DsPhaseFinished
if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
} else {
masterLogger.Error(err, "unexpected error when master update status in finished")
return ctrl.Result{}, err
}
}
masterLogger.Info("******************************************************")
desired.Status.Phase = dsv1alpha1.DsPhaseNone
if err := r.Update(ctx, desired); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: false}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *DSMasterReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.recorder = mgr.GetEventRecorderFor("master-controller")
filter := &Predicate{}
return ctrl.NewControllerManagedBy(mgr).
For(&dsv1alpha1.DSMaster{}).
Owns(&corev1.Pod{}).
Owns(&corev1.Service{}).
Owns(&v2beta2.HorizontalPodAutoscaler{}).
Owns(&corev1.ServiceAccount{}).
Owns(&v1.Role{}).
Owns(&v1.RoleBinding{}).
// or use WithEventFilter()
WithEventFilter(filter).
Complete(r)
}
func (r *DSMasterReconciler) ensureMembers(ctx context.Context, cluster *dsv1alpha1.DSMaster) (bool, error) {
pms, err := r.podMemberSet(ctx, cluster)
if err != nil {
return true, err
}
if len(pms) > 0 {
return !allMembersHealth(pms), nil
} else {
return false, nil
}
}
func (r *DSMasterReconciler) ensureScaled(ctx context.Context, cluster *dsv1alpha1.DSMaster) (bool, error) {
// Get current members in this cluster
ms, err := r.podMemberSet(ctx, cluster)
if err != nil {
return true, err
}
// Scale up
if len(ms) < cluster.Spec.Replicas {
err = r.createMember(ctx, cluster)
if err != nil {
r.recorder.Event(cluster, corev1.EventTypeWarning, "cannot create the new ds-master pod", "the ds-master pod had been created failed")
return true, err
}
// Cluster modified, next reconcile will enter r.ensureMembers()
return true, err
}
// Scale down
if len(ms) > cluster.Spec.Replicas {
pod := &corev1.Pod{}
member := ms.PickOne()
pod.SetName(member.Name)
pod.SetNamespace(member.Namespace)
err = r.deletePod(ctx, pod)
if err != nil {
return true, err
}
return true, err
}
return false, nil
}
func (r *DSMasterReconciler) createMember(ctx context.Context, cluster *dsv1alpha1.DSMaster) error {
masterLogger.Info("Starting add new member to cluster", "cluster", cluster.Name)
defer masterLogger.Info("End add new member to cluster", "cluster", cluster.Name)
// New Pod
pod, err := r.newDSMasterPod(cluster)
if err != nil {
return err
}
// Create pod
if err = r.Client.Create(ctx, pod); err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
return nil
}
func (r *DSMasterReconciler) deletePod(ctx context.Context, pod *corev1.Pod) error {
masterLogger.Info("begin delete pod", "pod name", pod.Name)
if err := r.Client.Delete(ctx, pod); err != nil && !apierrors.IsNotFound(err) {
return err
}
return nil
}
func (r *DSMasterReconciler) ensureUpgraded(ctx context.Context, cluster *dsv1alpha1.DSMaster) (bool, error) {
ms, err := r.podMemberSet(ctx, cluster)
if err != nil {
return false, err
}
masterLogger.Info("cluster.Spec.Version", "cluster.Spec.Version", cluster.Spec.Version)
for _, memset := range ms {
if r.predicateUpdate(memset, cluster) {
pod := &corev1.Pod{}
pod.SetName(memset.Name)
pod.SetNamespace(memset.Namespace)
if err := r.deletePod(ctx, pod); err != nil {
return false, err
}
return true, nil
}
}
return false, nil
}
func getNeedUpgradePods(ctx context.Context, cli *kubernetes.Clientset, cluster *dsv1alpha1.DSMaster) (*corev1.PodList, error) {
podSelector, err := labels.NewRequirement(dsv1alpha1.DsVersionLabel, selection.NotIn, []string{cluster.Spec.Version})
if err != nil {
return nil, err
}
podAppSelect, err := labels.NewRequirement(dsv1alpha1.DsAppName, selection.Equals, []string{dsv1alpha1.DsMasterLabel})
if err != nil {
return nil, err
}
selector := labels.NewSelector()
selector = selector.Add(*podSelector).Add(*podAppSelect)
podListOptions := metav1.ListOptions{
LabelSelector: selector.String(),
}
return cli.CoreV1().Pods(cluster.Namespace).List(ctx, podListOptions)
}
func (r *DSMasterReconciler) ensureMasterService(ctx context.Context, cluster *dsv1alpha1.DSMaster) error {
// 1. Client service
service := &corev1.Service{}
namespacedName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsHeadLessServiceLabel}
if err := r.Client.Get(ctx, namespacedName, service); err != nil {
// Local cache not found
if apierrors.IsNotFound(err) && !apierrors.IsAlreadyExists(err) {
service = createMasterService(cluster)
if err := controllerutil.SetControllerReference(cluster, service, r.Scheme); err != nil {
return err
}
// Remote may already exist, so we will return err, for the next time, this code will not execute
if err := r.Client.Create(ctx, service); err != nil {
return err
}
r.recorder.Event(cluster, corev1.EventTypeNormal, "ds-operator-service created", "master headless service had been created")
}
}
return nil
}
type Predicate struct{}
// Create will be trigger when object created or controller restart
// first time see the object
func (r *Predicate) Create(evt event.CreateEvent) bool {
switch evt.Object.(type) {
case *dsv1alpha1.DSMaster:
return true
}
return false
}
func (r *Predicate) Update(evt event.UpdateEvent) bool {
switch evt.ObjectNew.(type) {
case *dsv1alpha1.DSMaster:
oldC := evt.ObjectOld.(*dsv1alpha1.DSMaster)
newC := evt.ObjectNew.(*dsv1alpha1.DSMaster)
// Only care about size, repo,version and paused fields
if oldC.Spec.Replicas != newC.Spec.Replicas {
return true
}
if oldC.Spec.Paused != newC.Spec.Paused {
return true
}
if oldC.Spec.Version != newC.Spec.Version {
return true
}
if oldC.Spec.Repository != newC.Spec.Repository {
return true
}
// If cluster has been marked as deleted, check if we have removed our finalizer
// If it has our finalizer, indicating our cleaning up works has not been done.
if oldC.DeletionTimestamp.IsZero() && !newC.DeletionTimestamp.IsZero() {
if controllerutil.ContainsFinalizer(newC, dsv1alpha1.FinalizerName) {
return true
}
}
}
return false
}
func (r *Predicate) Delete(evt event.DeleteEvent) bool {
switch evt.Object.(type) {
case *dsv1alpha1.DSMaster:
return true
case *corev1.Pod:
return true
case *corev1.Service:
return true
}
return false
}
func (r *Predicate) Generic(evt event.GenericEvent) bool {
switch evt.Object.(type) {
case *dsv1alpha1.DSMaster:
return true
}
return false
}
func (r *DSMasterReconciler) predicateUpdate(member *Member, cluster *dsv1alpha1.DSMaster) bool {
return member.Version != cluster.Spec.Version
}
func (r *DSMasterReconciler) ensureHPA(ctx context.Context, cluster *dsv1alpha1.DSMaster) error {
hpa := &v2beta2.HorizontalPodAutoscaler{}
namespacedName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsWorkerHpa}
if err := r.Client.Get(ctx, namespacedName, hpa); err != nil {
// Local cache not found
if apierrors.IsNotFound(err) && cluster.Spec.HpaPolicy != nil {
hpa := r.createHPA(cluster)
if err := controllerutil.SetControllerReference(cluster, hpa, r.Scheme); err != nil {
masterLogger.Info("set controller worker hpa error")
return err
}
// Remote may already exist, so we will return err, for the next time, this code will not execute
if err := r.Client.Create(ctx, hpa); err != nil {
masterLogger.Info("create worker hpa error")
return err
}
}
}
if hpa.Kind != "" && cluster.Spec.HpaPolicy == nil {
if err := r.deleteHPA(ctx, hpa); err != nil {
masterLogger.Info("delete hpa error")
return err
}
}
return nil
}
// 创建 ServiceAccount
func (r *DSMasterReconciler) createServiceAccountIfNotExists(ctx context.Context, cluster *dsv1alpha1.DSMaster) (err error) {
masterLogger.Info("start create service account.")
sa := &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: dsv1alpha1.DsServiceAccount,
Namespace: cluster.Namespace,
},
}
err = r.Create(ctx, sa)
if err != nil {
masterLogger.Error(err, "create service account error")
return err
}
// binding the sa
err = controllerutil.SetControllerReference(cluster, sa, r.Scheme)
if err != nil {
masterLogger.Error(err, "sa SetControllerReference error")
return err
}
ro := &v1.Role{}
namespacedName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsRole}
if err := r.Client.Get(ctx, namespacedName, ro); err != nil {
if apierrors.IsNotFound(err) && !apierrors.IsAlreadyExists(err) {
// Remote may already exist, so we will return err, for the next time, this code will not execute
ro := r.createRole(cluster)
if err := controllerutil.SetControllerReference(cluster, ro, r.Scheme); err != nil {
masterLogger.Info("set controller role error")
return err
}
masterLogger.Info("set role begin")
if err := r.Client.Create(ctx, ro); err != nil {
return err
}
}
}
rb := &v1.RoleBinding{}
rbNamespacedName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsRoleBinding}
if err := r.Client.Get(ctx, rbNamespacedName, rb); err != nil {
if apierrors.IsNotFound(err) && !apierrors.IsAlreadyExists(err) {
rb := r.createRoleBinding(cluster)
if err := controllerutil.SetControllerReference(cluster, rb, r.Scheme); err != nil {
masterLogger.Info("set controller rolebinding error")
return err
}
masterLogger.Info("set rolebinding begin")
if err := r.Client.Create(ctx, rb); err != nil {
return err
}
}
}
return nil
}