blob: e997b8b6c3f1c6bb51938ff00eb71e8f3b69328d [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package operator
import (
"context"
"fmt"
"sort"
"time"
"github.com/go-logr/logr"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
runtimelog "sigs.k8s.io/controller-runtime/pkg/log"
operatorv1alpha1 "github.com/apache/skywalking-swck/operator/apis/operator/v1alpha1"
"github.com/apache/skywalking-swck/operator/pkg/kubernetes"
)
// OAPServerConfigReconciler reconciles a OAPServerConfig object
type OAPServerConfigReconciler struct {
client.Client
Scheme *runtime.Scheme
}
type SortByFileName []operatorv1alpha1.FileConfig
func (a SortByFileName) Len() int {
return len(a)
}
func (a SortByFileName) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func (a SortByFileName) Less(i, j int) bool {
return a[i].Name < a[j].Name
}
type SortByEnvName []core.EnvVar
func (a SortByEnvName) Len() int {
return len(a)
}
func (a SortByEnvName) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func (a SortByEnvName) Less(i, j int) bool {
return a[i].Name < a[j].Name
}
// +kubebuilder:rbac:groups=operator.skywalking.apache.org,resources=oapserverconfigs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=operator.skywalking.apache.org,resources=oapserverconfigs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=operator.skywalking.apache.org,resources=oapservers,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=operator.skywalking.apache.org,resources=oapservers/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
func (r *OAPServerConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := runtimelog.FromContext(ctx)
log.Info("=====================oapserverconfig reconcile started================================")
oapServerConfig := operatorv1alpha1.OAPServerConfig{}
if err := r.Client.Get(ctx, req.NamespacedName, &oapServerConfig); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
oapList := operatorv1alpha1.OAPServerList{}
opts := []client.ListOption{
client.InNamespace(req.Namespace),
}
if err := r.List(ctx, &oapList, opts...); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, fmt.Errorf("failed to list oapserver: %w", err)
}
// get the specific version's oapserver
for i := range oapList.Items {
if oapList.Items[i].Spec.Version == oapServerConfig.Spec.Version {
oapServer := oapList.Items[i]
deployment := apps.Deployment{}
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: oapServer.Namespace, Name: oapServer.Name + "-oap"}, &deployment); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, fmt.Errorf("failed to get the deployment of OAPServer: %w", err)
}
// overlay the env configuration
envChanged, err := r.OverlayEnv(log, &oapServerConfig, &deployment)
if err != nil {
log.Error(err, "failed to overlay the env configuration")
}
// overlay the file configuration
fileChanged, err := r.OverlayStaticFile(ctx, log, &oapServerConfig, &deployment)
if err != nil {
log.Error(err, "failed to overlay the file configuration")
}
// update the deployment
if envChanged || fileChanged {
if err := r.Client.Update(ctx, &deployment); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to update the deployment of OAPServer: %w", err)
}
}
}
}
if err := r.checkState(ctx, log, &oapServerConfig, oapList); err != nil {
log.Error(err, "failed to update OAPServerConfig's status")
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: schedDuration}, nil
}
func (r *OAPServerConfigReconciler) OverlayEnv(log logr.Logger,
oapServerConfig *operatorv1alpha1.OAPServerConfig, deployment *apps.Deployment) (bool, error) {
changed := false
sort.Sort(SortByEnvName(oapServerConfig.Spec.Env))
newMd5Hash := MD5Hash(oapServerConfig.Spec.Env)
oldMd5Hash, ok := deployment.Spec.Template.Labels["md5-env"]
if !ok || oldMd5Hash != newMd5Hash {
changed = true
}
if changed {
deployment.Spec.Template.Spec.Containers[0].Env = oapServerConfig.Spec.Env
deployment.Spec.Template.Labels["md5-env"] = newMd5Hash
} else {
log.Info("env configuration keeps the same as before")
return changed, nil
}
log.Info("successfully overlay the env configuration")
return changed, nil
}
func (r *OAPServerConfigReconciler) OverlayStaticFile(ctx context.Context, log logr.Logger,
oapServerConfig *operatorv1alpha1.OAPServerConfig, deployment *apps.Deployment) (bool, error) {
changed := false
sort.Sort(SortByFileName(oapServerConfig.Spec.File))
newMd5Hash := MD5Hash(oapServerConfig.Spec.File)
configmap := core.ConfigMap{}
err := r.Client.Get(ctx, client.ObjectKey{Namespace: oapServerConfig.Namespace,
Name: oapServerConfig.Name}, &configmap)
if err != nil && !apierrors.IsNotFound(err) {
log.Error(err, "failed to get the static file configuration's configmap")
return changed, err
}
// if the configmap exist and the static configuration changed, then delete it
if !apierrors.IsNotFound(err) {
oldMd5Hash := configmap.Labels["md5-file"]
if oldMd5Hash != newMd5Hash {
changed = true
if err := r.Client.Delete(ctx, &configmap); err != nil {
log.Error(err, "faled to delete the static file configuration's configmap")
}
} else {
log.Info("file configuration keeps the same as before")
return changed, nil
}
}
data := make(map[string]string)
mounts := []core.VolumeMount{}
volume := core.Volume{
Name: oapServerConfig.Name,
VolumeSource: core.VolumeSource{
ConfigMap: &core.ConfigMapVolumeSource{
LocalObjectReference: core.LocalObjectReference{
Name: oapServerConfig.Name,
},
},
},
}
for _, f := range oapServerConfig.Spec.File {
mounts = append(mounts, core.VolumeMount{
MountPath: f.Path + "/" + f.Name,
Name: oapServerConfig.Name,
SubPath: f.Name,
})
data[f.Name] = f.Data
}
labels := make(map[string]string)
// set the version label
labels["version"] = oapServerConfig.Spec.Version
// set the configuration type
labels["oapServerConfig"] = "static"
// set the md5 value of the data
labels["md5-file"] = newMd5Hash
// create configmap for static files
configmap = core.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: oapServerConfig.Name,
Namespace: oapServerConfig.Namespace,
OwnerReferences: []metav1.OwnerReference{
metav1.OwnerReference{
APIVersion: oapServerConfig.APIVersion,
Kind: oapServerConfig.Kind,
Name: oapServerConfig.Name,
UID: oapServerConfig.UID,
},
},
Labels: labels,
},
Data: data,
}
if err := r.Client.Create(ctx, &configmap); err != nil {
log.Error(err, "failed to create static configuration configmap")
return changed, err
}
overlayDeployment := deployment
overlayDeployment.Spec.Template.Spec.Containers[0].VolumeMounts = mounts
overlayDeployment.Spec.Template.Spec.Volumes = []core.Volume{volume}
if err := kubernetes.ApplyOverlay(deployment, overlayDeployment); err != nil {
log.Error(err, "failed to apply overlay deployment")
}
log.Info("successfully overlay the file configuration")
return changed, nil
}
func (r *OAPServerConfigReconciler) checkState(ctx context.Context, log logr.Logger,
oapServerConfig *operatorv1alpha1.OAPServerConfig, oapList operatorv1alpha1.OAPServerList) error {
errCol := new(kubernetes.ErrorCollector)
nilTime := metav1.Time{}
now := metav1.NewTime(time.Now())
overlay := operatorv1alpha1.OAPServerConfigStatus{}
// get Instances and AvailableReplicas
for i := range oapList.Items {
if oapList.Items[i].Spec.Version == oapServerConfig.Spec.Version {
overlay.Desired += int(oapList.Items[i].Spec.Instances)
overlay.Ready += int(oapList.Items[i].Status.AvailableReplicas)
}
}
if oapServerConfig.Status.CreationTime == nilTime {
overlay.CreationTime = now
overlay.LastUpdateTime = now
} else {
overlay.CreationTime = oapServerConfig.Status.CreationTime
overlay.LastUpdateTime = now
}
oapServerConfig.Status = overlay
oapServerConfig.Kind = "OAPServerConfig"
if err := kubernetes.ApplyOverlay(oapServerConfig, &operatorv1alpha1.OAPServerConfig{Status: overlay}); err != nil {
errCol.Collect(fmt.Errorf("failed to apply overlay: %w", err))
return errCol.Error()
}
if err := r.updateStatus(ctx, oapServerConfig, overlay, errCol); err != nil {
errCol.Collect(fmt.Errorf("failed to update status of OAPServerConfig: %w", err))
}
log.Info("updated OAPServerConfig sub resource")
return errCol.Error()
}
func (r *OAPServerConfigReconciler) updateStatus(ctx context.Context, oapServerConfig *operatorv1alpha1.OAPServerConfig,
overlay operatorv1alpha1.OAPServerConfigStatus, errCol *kubernetes.ErrorCollector) error {
// avoid resource conflict
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := r.Client.Get(ctx, client.ObjectKey{Name: oapServerConfig.Name, Namespace: oapServerConfig.Namespace}, oapServerConfig); err != nil {
errCol.Collect(fmt.Errorf("failed to get oapServerConfig: %w", err))
}
oapServerConfig.Status = overlay
oapServerConfig.Kind = "OAPServerConfig"
if err := kubernetes.ApplyOverlay(oapServerConfig, &operatorv1alpha1.OAPServerConfig{Status: overlay}); err != nil {
errCol.Collect(fmt.Errorf("failed to apply overlay: %w", err))
}
if err := r.Status().Update(ctx, oapServerConfig); err != nil {
errCol.Collect(fmt.Errorf("failed to update status: %w", err))
}
return errCol.Error()
})
}
// SetupWithManager sets up the controller with the Manager.
func (r *OAPServerConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&operatorv1alpha1.OAPServerConfig{}).
Owns(&apps.Deployment{}).
Complete(r)
}