blob: 676c499247eeddbdd8c3278714e3733cb9f05e45 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package kubernetes
import (
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"time"
"github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata"
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
"github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
)
const (
// this const is available here https://github.com/kubernetes/kubernetes/blob/6e0cb243d57592c917fe449dde20b0e246bc66be/pkg/controller/deployment/util/deployment_util.go#L100
// but it doesn't worth the dependency.
deploymentMinimumReplicasUnavailable = "MinimumReplicasUnavailable"
)
// IsDeploymentAvailable verifies if the Deployment conditions match the Available status
func IsDeploymentAvailable(deployment *appsv1.Deployment) bool {
return isDeploymentInCondition(deployment, appsv1.DeploymentAvailable, v1.ConditionTrue)
}
// IsDeploymentFailed returns true in case of Deployment not available (IsDeploymentAvailable returns false) or it has a condition of
// DeploymentReplicaFailure == true.
func IsDeploymentFailed(deployment *appsv1.Deployment) bool {
if IsDeploymentAvailable(deployment) {
return false
}
return isDeploymentInCondition(deployment, appsv1.DeploymentReplicaFailure, v1.ConditionTrue)
}
func isDeploymentInCondition(deployment *appsv1.Deployment, conditionType appsv1.DeploymentConditionType, status v1.ConditionStatus) bool {
for _, condition := range deployment.Status.Conditions {
if condition.Type == conditionType &&
condition.Status == status {
return true
}
}
return false
}
// IsDeploymentMinimumReplicasUnavailable verifies if the deployment has the minimum replicas available
func IsDeploymentMinimumReplicasUnavailable(deployment *appsv1.Deployment) bool {
for _, condition := range deployment.Status.Conditions {
if condition.Type == appsv1.DeploymentAvailable &&
condition.Status == v1.ConditionFalse &&
condition.Reason == deploymentMinimumReplicasUnavailable {
return true
}
}
return false
}
// GetDeploymentUnavailabilityMessage returns a string explaining why the given deployment is unavailable. If empty, there's no replica failure.
// Note that the Deployment might be available, but a second replica failed to scale. Always check IsDeploymentAvailable.
//
// See: https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#failed-deployment
func GetDeploymentUnavailabilityMessage(deployment *appsv1.Deployment) string {
for _, condition := range deployment.Status.Conditions {
if condition.Type == appsv1.DeploymentProgressing &&
condition.Status == v1.ConditionFalse {
return fmt.Sprintf("deployment %s unavailable: reason %s, message %s", deployment.Name, condition.Reason, condition.Message)
}
if condition.Type == appsv1.DeploymentReplicaFailure &&
condition.Status == v1.ConditionTrue {
return fmt.Sprintf("deployment %s unavailable: reason %s, message %s", deployment.Name, condition.Reason, condition.Message)
}
}
return ""
}
// MarkDeploymentToRollout marks the given Deployment to restart now. The object must be updated.
// Code adapted from here: https://github.com/kubernetes/kubectl/blob/release-1.26/pkg/polymorphichelpers/objectrestarter.go#L44
func MarkDeploymentToRollout(deployment *appsv1.Deployment) error {
if deployment.Spec.Paused {
return errors.New("can't restart paused deployment (run rollout resume first)")
}
if deployment.Spec.Template.ObjectMeta.Annotations == nil {
deployment.Spec.Template.ObjectMeta.Annotations = make(map[string]string)
}
klog.V(log.I).Infof("Triggering restart of %s", deployment.Name)
deployment.Spec.Template.ObjectMeta.Annotations[metadata.RestartedAt] = time.Now().Format(time.RFC3339)
return nil
}
// AnnotateDeploymentConfigChecksum adds the checksum/config annotation to the template annotations of the Deployment to set the current configuration.
// If the checksum has changed from the previous value, the restartedAt annotation is also added and a new rollout is started.
// Code adapted from here: https://github.com/kubernetes/kubectl/blob/release-1.26/pkg/polymorphichelpers/objectrestarter.go#L44
func AnnotateDeploymentConfigChecksum(workflow *operatorapi.SonataFlow, deployment *appsv1.Deployment, userPropsCM *v1.ConfigMap, managedPropsCM *v1.ConfigMap) error {
if deployment.Spec.Paused {
return errors.New("can't restart paused deployment (run rollout resume first)")
}
if deployment.Spec.Template.ObjectMeta.Annotations == nil {
deployment.Spec.Template.ObjectMeta.Annotations = make(map[string]string)
}
currentChecksum, ok := deployment.Spec.Template.ObjectMeta.Annotations[metadata.Checksum]
if !ok {
currentChecksum = ""
}
newChecksum, err := calculateHash(userPropsCM, managedPropsCM, workflow)
if err != nil {
return err
}
if newChecksum != currentChecksum {
klog.V(log.I).Infof("Updating checksum of %s", deployment.Name)
deployment.Spec.Template.ObjectMeta.Annotations[metadata.Checksum] = newChecksum
if currentChecksum != "" {
klog.V(log.I).Infof("Triggering rollout of %s", deployment.Name)
deployment.Spec.Template.ObjectMeta.Annotations[metadata.RestartedAt] = time.Now().Format(time.RFC3339)
}
} else {
klog.V(log.I).Infof("Skipping update of deployment %s, checksum unchanged", deployment.Name)
}
return nil
}
func dataFromCM(cm *v1.ConfigMap, key string) string {
data, hasKey := cm.Data[key]
if !hasKey {
return ""
}
return data
}
func calculateHash(userPropsCM, managedPropsCM *v1.ConfigMap, workflow *operatorapi.SonataFlow) (string, error) {
aggregatedProps := fmt.Sprintf("%s,%s", dataFromCM(userPropsCM, workflowproj.ApplicationPropertiesFileName),
dataFromCM(managedPropsCM, workflowproj.GetManagedPropertiesFileName(workflow)))
hash := sha256.New()
_, err := hash.Write([]byte(aggregatedProps))
if err != nil {
return "", err
}
hashInBytes := hash.Sum(nil)
hashString := hex.EncodeToString(hashInBytes)
return hashString, nil
}
// GetContainerByName returns a pointer to the Container within the given Deployment.
// If none found, returns nil.
// It also returns the position where the container was found, -1 if none
func GetContainerByName(name string, podSpec *v1.PodSpec) (*v1.Container, int) {
if podSpec == nil {
return nil, -1
}
for i, container := range podSpec.Containers {
if container.Name == name {
return &container, i
}
}
return nil, -1
}
// GetContainerPortByName returns a pointer to the ContainerPort within the given Container.
// If none found, returns nil.
// It also returns the position where the container port was found, -1 if none.
func GetContainerPortByName(name string, container *v1.Container) (*v1.ContainerPort, int) {
if container == nil {
return nil, -1
}
for i, containerPort := range container.Ports {
if name == containerPort.Name {
return &containerPort, i
}
}
return nil, -1
}
// AddOrReplaceContainer replace the existing container or add if it doesn't exist in the .spec.containers attribute
func AddOrReplaceContainer(containerName string, container v1.Container, podSpec *v1.PodSpec) {
_, idx := GetContainerByName(containerName, podSpec)
if idx < 0 {
podSpec.Containers = append(podSpec.Containers, container)
} else {
podSpec.Containers[idx] = container
}
}