blob: 7adf2dc50c6adc193711238357a5433757c57fc0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package controllers
import (
"context"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
)
// DSWorkerReconciler reconciles a DSWorker object
type DSWorkerReconciler struct {
client.Client
Scheme *runtime.Scheme
recorder record.EventRecorder
}
var (
workerLogger = ctrl.Log.WithName("DSWorker-controller")
)
//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsworkers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsworkers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsworkers/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;create;delete;list;watch
//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// the DSWorker object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile
func (r *DSWorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
workerLogger.Info("dmWorker start reconcile logic")
defer workerLogger.Info("dmWorker Reconcile end ---------------------------------------------")
cluster := &dsv1alpha1.DSWorker{}
if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
if errors.IsNotFound(err) {
r.recorder.Event(cluster, corev1.EventTypeWarning, "dmWorker is not Found", "dmWorker is not Found")
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
desired := cluster.DeepCopy()
// Handler finalizer
// examine DeletionTimestamp to determine if object is under deletion
if cluster.ObjectMeta.DeletionTimestamp.IsZero() {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object. This is equivalent
// registering our finalizer.
if !controllerutil.ContainsFinalizer(desired, dsv1alpha1.FinalizerName) {
controllerutil.AddFinalizer(desired, dsv1alpha1.FinalizerName)
if err := r.Update(ctx, desired); err != nil {
return ctrl.Result{}, err
}
}
} else {
// The object is being deleted
if controllerutil.ContainsFinalizer(desired, dsv1alpha1.FinalizerName) {
// our finalizer is present, so lets handle any external dependency
if err := r.ensureDSWorkerDeleted(ctx, cluster); err != nil {
return ctrl.Result{}, err
}
// remove our finalizer from the list and update it.
controllerutil.RemoveFinalizer(desired, dsv1alpha1.FinalizerName)
if err := r.Update(ctx, desired); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
// If dsworker-cluster is paused, we do nothing on things changed.
// Until dsworker-cluster is un-paused, we will reconcile to the dsworker state of that point.
if cluster.Spec.Paused {
workerLogger.Info("ds-worker control has been paused: ", "ds-worker-name", cluster.Name)
desired.Status.ControlPaused = true
if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
} else {
masterLogger.Error(err, "unexpected error when worker update status in paused")
return ctrl.Result{}, err
}
}
r.recorder.Event(cluster, corev1.EventTypeNormal, "the spec status is paused", "do nothing")
return ctrl.Result{}, nil
}
// 1. First time we see the ds-worker-cluster, initialize it
if cluster.Status.Phase == dsv1alpha1.DsPhaseNone {
if desired.Status.Selector == "" {
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: LabelForWorkerPod()})
if err != nil {
masterLogger.Error(err, "Error retrieving selector labels")
return reconcile.Result{}, err
}
desired.Status.Selector = selector.String()
}
desired.Status.Phase = dsv1alpha1.DsPhaseCreating
workerLogger.Info("phase had been changed from none ---> creating")
if err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err
} else {
masterLogger.Error(err, "unexpected error when worker update status in creating")
return ctrl.Result{}, err
}
}
}
// 3. Ensure bootstrapped, we will block here util cluster is up and healthy
workerLogger.Info("Ensuring cluster members")
if requeue, err := r.ensureMembers(ctx, cluster); requeue {
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
}
// 4. Ensure cluster scaled
workerLogger.Info("Ensuring cluster scaled")
if requeue, err := r.ensureScaled(ctx, cluster); requeue {
return ctrl.Result{Requeue: true, RequeueAfter: 5 * time.Second}, err
}
// .5 Ensure cluster upgraded
workerLogger.Info("Ensuring cluster upgraded")
if requeue, err := r.ensureUpgraded(ctx, cluster); requeue {
return ctrl.Result{Requeue: true}, err
}
desired.Status.Phase = dsv1alpha1.DsPhaseFinished
if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
} else {
masterLogger.Error(err, "unexpected error when worker update status in finished")
return ctrl.Result{}, err
}
}
workerLogger.Info("******************************************************")
desired.Status.Phase = dsv1alpha1.DsPhaseNone
return ctrl.Result{Requeue: false}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *DSWorkerReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.recorder = mgr.GetEventRecorderFor("worker-controller")
return ctrl.NewControllerManagedBy(mgr).
For(&dsv1alpha1.DSWorker{}).
Owns(&corev1.Pod{}).
Complete(r)
}
func (r *DSWorkerReconciler) ensureMembers(ctx context.Context, cluster *dsv1alpha1.DSWorker) (bool, error) {
pms, err := r.podMemberSet(ctx, cluster)
if err != nil {
return true, err
}
if len(pms) > 0 {
return !allMembersHealth(pms), nil
} else {
return false, nil
}
}
func (r *DSWorkerReconciler) ensureScaled(ctx context.Context, cluster *dsv1alpha1.DSWorker) (bool, error) {
// Get current members in this cluster
ms, err := r.podMemberSet(ctx, cluster)
if err != nil {
return true, err
}
// Scale up
if len(ms) < cluster.Spec.Replicas {
err = r.createMember(ctx, cluster)
if err != nil {
r.recorder.Event(cluster, corev1.EventTypeWarning, "cannot create the new ds-dsworker pod", "the ds-dsworker pod had been created failed")
return true, err
}
return true, err
}
// Scale down
if len(ms) > cluster.Spec.Replicas {
pod := &corev1.Pod{}
member := ms.PickOne()
pod.SetName(member.Name)
pod.SetNamespace(member.Namespace)
err = r.deleteMember(ctx, pod, cluster)
if err != nil {
return true, err
}
return true, err
}
return false, nil
}
func (r *DSWorkerReconciler) ensureUpgraded(ctx context.Context, cluster *dsv1alpha1.DSWorker) (bool, error) {
ms, err := r.podMemberSet(ctx, cluster)
if err != nil {
return false, err
}
for _, memset := range ms {
if memset.Version != cluster.Spec.Version {
pod := &corev1.Pod{}
pod.SetName(memset.Name)
pod.SetNamespace(memset.Namespace)
if err := r.deleteMember(ctx, pod, cluster); err != nil {
return false, err
}
return true, nil
}
}
return false, nil
}
func (r *DSWorkerReconciler) ensureDSWorkerDeleted(ctx context.Context, cluster *dsv1alpha1.DSWorker) error {
if err := r.Client.Delete(ctx, cluster, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
return err
}
return nil
}
func (r *DSWorkerReconciler) createMember(ctx context.Context, cluster *dsv1alpha1.DSWorker) error {
workerLogger.Info("Starting add new member to cluster", "cluster", cluster.Name)
defer workerLogger.Info("End add new member to cluster", "cluster", cluster.Name)
// New Pod
pod, err := r.newDSWorkerPod(ctx, cluster)
if err != nil {
return err
}
// Create pod
if err = r.Client.Create(ctx, pod); err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
desired := cluster.DeepCopy()
desired.Spec.Replicas += 1
if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
return err
}
return nil
}
func (r *DSWorkerReconciler) deleteMember(ctx context.Context, pod *corev1.Pod, cluster *dsv1alpha1.DSWorker) error {
workerLogger.Info("begin delete pod", "pod name", pod.Name)
if err := r.Client.Delete(ctx, pod); err != nil && !apierrors.IsNotFound(err) {
return err
}
desired := cluster.DeepCopy()
desired.Spec.Replicas -= 1
if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
return err
}
return nil
}