// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package operator
import (
core ""
apierrors ""
metav1 ""
ctrl ""
runtimelog ""
operatorv1alpha1 ""
// OAPServerDynamicConfigReconciler reconciles a OAPServerDynamicConfig object
type OAPServerDynamicConfigReconciler struct {
Scheme *runtime.Scheme
type SortByConfigName []operatorv1alpha1.Config
func (a SortByConfigName) Len() int {
return len(a)
func (a SortByConfigName) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
func (a SortByConfigName) Less(i, j int) bool {
return a[i].Name < a[j].Name
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
func (r *OAPServerDynamicConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)
log := runtimelog.FromContext(ctx)
log.Info("=====================reconcile started================================")
config := operatorv1alpha1.OAPServerDynamicConfig{}
if err := r.Client.Get(ctx, req.NamespacedName, &config); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
oapList := operatorv1alpha1.OAPServerList{}
opts := []client.ListOption{
if err := r.List(ctx, &oapList, opts...); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, fmt.Errorf("failed to list oapserver: %w", err)
// get the specific version's oapserver
for i := range oapList.Items {
if oapList.Items[i].Spec.Version == config.Spec.Version {
oapServer := oapList.Items[i]
// Update the dynamic configuration
err := r.UpdateDynamicConfig(ctx, log, &oapServer, &config)
if err != nil {
log.Error(err, "failed to update the dynamic configuration")
return ctrl.Result{}, err
if err := r.checkState(ctx, log, &config); err != nil {
log.Error(err, "failed to update OAPServerDynamicConfig's status")
return ctrl.Result{}, err
return ctrl.Result{RequeueAfter: schedDuration}, nil
func (r *OAPServerDynamicConfigReconciler) UpdateDynamicConfig(ctx context.Context, log logr.Logger,
oapServer *operatorv1alpha1.OAPServer, config *operatorv1alpha1.OAPServerDynamicConfig) error {
changed := false
exsitedConfiguration := map[string]bool{}
dynamicConfigList := operatorv1alpha1.OAPServerDynamicConfigList{}
opts := []client.ListOption{
if err := r.List(ctx, &dynamicConfigList, opts...); err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to list OAPServerDynamicConfig: %w", err)
for _, i := range dynamicConfigList.Items {
if i.Name != config.Name {
for _, n := range i.Spec.Data {
exsitedConfiguration[n.Name] = true
configuration := config.Spec.Data
for i := range configuration {
_, ok := exsitedConfiguration[configuration[i].Name]
if ok {
return fmt.Errorf("the configuration %s already exist", configuration[i].Name)
newMd5Hash := MD5Hash(configuration)
labelSelector, err := labels.Parse(config.Spec.LabelSelector)
if err != nil {
log.Error(err, "failed to parse string to labelselector")
return err
label, err := labels.ConvertSelectorToLabelsMap(config.Spec.LabelSelector)
if err != nil {
log.Error(err, "failed to convert labelselector to map")
return err
configmap := core.ConfigMap{}
err = r.Client.Get(ctx, client.ObjectKey{Namespace: config.Namespace,
Name: config.Name}, &configmap)
if err != nil && !apierrors.IsNotFound(err) {
log.Error(err, "failed to get the dynamic configuration configmap")
return err
// if the configmap exist and the dynamic configuration or labelselector changed, then delete it
if !apierrors.IsNotFound(err) {
oldMd5Hash := configmap.Labels["md5"]
// check dynamic configuration
if oldMd5Hash != newMd5Hash {
changed = true
// check labelselector
if !labelSelector.Matches(labels.Set(configmap.Labels)) {
changed = true
if changed {
if err := r.Client.Delete(ctx, &configmap); err != nil {
log.Error(err, "faled to delete the dynamic configuration configmap")
} else {
log.Info("dynamic configuration keeps the same as before")
return nil
// set the version label
label["version"] = config.Spec.Version
// set the configuration type
label["OAPServerConfig"] = "dynamic"
// set the md5 value
label["md5"] = newMd5Hash
// set the data
data := map[string]string{}
for _, v := range configuration {
data[v.Name] = v.Value
configmap = core.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: config.Name,
Namespace: oapServer.Namespace,
OwnerReferences: []metav1.OwnerReference{
APIVersion: config.APIVersion,
Kind: config.Kind,
Name: config.Name,
UID: config.UID,
Labels: label,
Data: data,
// create the dynamic configuration configmap
if err := r.Client.Create(ctx, &configmap); err != nil {
log.Error(err, "failed to create dynamic configuration configmap")
return err
log.Info("successfully setup the dynamic configuration")
return nil
func (r *OAPServerDynamicConfigReconciler) checkState(ctx context.Context, log logr.Logger,
config *operatorv1alpha1.OAPServerDynamicConfig) error {
errCol := new(kubernetes.ErrorCollector)
nilTime := metav1.Time{}
now := metav1.NewTime(time.Now())
overlay := operatorv1alpha1.OAPServerDynamicConfigStatus{
State: "Stopped",
// get dynamic configuration's state
configmapList := core.ConfigMapList{}
opts := []client.ListOption{
"version": config.Spec.Version,
"OAPServerConfig": "dynamic",
if err := r.List(ctx, &configmapList, opts...); err != nil && !apierrors.IsNotFound(err) {
errCol.Collect(fmt.Errorf("failed to list configmap: %w", err))
for i := range configmapList.Items {
if strings.Contains(configmapList.Items[i].Name, config.Name) {
overlay.State = "Running"
if config.Status.CreationTime == nilTime {
overlay.CreationTime = now
overlay.LastUpdateTime = now
} else {
overlay.CreationTime = config.Status.CreationTime
overlay.LastUpdateTime = now
config.Status = overlay
config.Kind = "OAPServerDynamicConfig"
if err := kubernetes.ApplyOverlay(config, &operatorv1alpha1.OAPServerDynamicConfig{Status: overlay}); err != nil {
errCol.Collect(fmt.Errorf("failed to apply overlay: %w", err))
return errCol.Error()
if err := r.updateStatus(ctx, config, overlay, errCol); err != nil {
errCol.Collect(fmt.Errorf("failed to update status of OAPServerDynamicConfig: %w", err))
log.Info("updated OAPServerDynamicConfig sub resource")
return errCol.Error()
func (r *OAPServerDynamicConfigReconciler) updateStatus(ctx context.Context, config *operatorv1alpha1.OAPServerDynamicConfig,
overlay operatorv1alpha1.OAPServerDynamicConfigStatus, errCol *kubernetes.ErrorCollector) error {
// avoid resource conflict
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := r.Client.Get(ctx, client.ObjectKey{Name: config.Name, Namespace: config.Namespace}, config); err != nil {
errCol.Collect(fmt.Errorf("failed to get OAPServerDynamicConfig: %w", err))
config.Status = overlay
config.Kind = "OAPServerDynamicConfig"
if err := kubernetes.ApplyOverlay(config, &operatorv1alpha1.OAPServerDynamicConfig{Status: overlay}); err != nil {
errCol.Collect(fmt.Errorf("failed to apply overlay: %w", err))
if err := r.Status().Update(ctx, config); err != nil {
errCol.Collect(fmt.Errorf("failed to update status: %w", err))
return errCol.Error()
// SetupWithManager sets up the controller with the Manager.
func (r *OAPServerDynamicConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
func MD5Hash(a interface{}) string {
h := md5.New()
h.Write([]byte(fmt.Sprintf("%v", a)))
return fmt.Sprintf("%x", h.Sum(nil))