blob: c5d18aa727f9b1a929a57c31dc065b82b8e370c8 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package helmreconciler
import (
"context"
"fmt"
"sort"
"strings"
"time"
)
import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
kctldeployment "k8s.io/kubectl/pkg/util/deployment"
)
import (
"github.com/apache/dubbo-go-pixiu/operator/pkg/name"
"github.com/apache/dubbo-go-pixiu/operator/pkg/object"
"github.com/apache/dubbo-go-pixiu/operator/pkg/util/progress"
"github.com/apache/dubbo-go-pixiu/pkg/kube"
)
const (
// defaultWaitResourceTimeout is the maximum wait time for all resources(namespace/deployment/pod) to be created.
defaultWaitResourceTimeout = 300 * time.Second
// cRDPollInterval is how often the state of CRDs is polled when waiting for their creation.
cRDPollInterval = 500 * time.Millisecond
// cRDPollTimeout is the maximum wait time for all CRDs to be created.
cRDPollTimeout = 60 * time.Second
)
// deployment holds associated replicaSets for a deployment
type deployment struct {
replicaSets *appsv1.ReplicaSet
deployment *appsv1.Deployment
}
// WaitForResources polls to get the current status of all pods, PVCs, and Services
// until all are ready or a timeout is reached
func WaitForResources(objects object.K8sObjects, client kube.Client,
waitTimeout time.Duration, dryRun bool, l *progress.ManifestLog) error {
if dryRun || TestMode {
return nil
}
if err := waitForCRDs(objects, client); err != nil {
return err
}
var notReady []string
var debugInfo map[string]string
// Check if we are ready immediately, to avoid the 2s delay below when we are already redy
if ready, _, _, err := waitForResources(objects, client, l); err == nil && ready {
return nil
}
errPoll := wait.Poll(2*time.Second, waitTimeout, func() (bool, error) {
isReady, notReadyObjects, debugInfoObjects, err := waitForResources(objects, client, l)
notReady = notReadyObjects
debugInfo = debugInfoObjects
return isReady, err
})
messages := []string{}
for _, id := range notReady {
debug, f := debugInfo[id]
if f {
messages = append(messages, fmt.Sprintf(" %s (%s)", id, debug))
} else {
messages = append(messages, fmt.Sprintf(" %s", debug))
}
}
if errPoll != nil {
msg := fmt.Sprintf("resources not ready after %v: %v\n%s", waitTimeout, errPoll, strings.Join(messages, "\n"))
return fmt.Errorf(msg)
}
return nil
}
func waitForResources(objects object.K8sObjects, cs kubernetes.Interface, l *progress.ManifestLog) (bool, []string, map[string]string, error) {
pods := []corev1.Pod{}
deployments := []deployment{}
daemonsets := []*appsv1.DaemonSet{}
statefulsets := []*appsv1.StatefulSet{}
namespaces := []corev1.Namespace{}
for _, o := range objects {
kind := o.GroupVersionKind().Kind
switch kind {
case name.NamespaceStr:
namespace, err := cs.CoreV1().Namespaces().Get(context.TODO(), o.Name, metav1.GetOptions{})
if err != nil {
return false, nil, nil, err
}
namespaces = append(namespaces, *namespace)
case name.DeploymentStr:
currentDeployment, err := cs.AppsV1().Deployments(o.Namespace).Get(context.TODO(), o.Name, metav1.GetOptions{})
if err != nil {
return false, nil, nil, err
}
_, _, newReplicaSet, err := kctldeployment.GetAllReplicaSets(currentDeployment, cs.AppsV1())
if err != nil || newReplicaSet == nil {
return false, nil, nil, err
}
newDeployment := deployment{
newReplicaSet,
currentDeployment,
}
deployments = append(deployments, newDeployment)
case name.DaemonSetStr:
ds, err := cs.AppsV1().DaemonSets(o.Namespace).Get(context.TODO(), o.Name, metav1.GetOptions{})
if err != nil {
return false, nil, nil, err
}
daemonsets = append(daemonsets, ds)
case name.StatefulSetStr:
sts, err := cs.AppsV1().StatefulSets(o.Namespace).Get(context.TODO(), o.Name, metav1.GetOptions{})
if err != nil {
return false, nil, nil, err
}
statefulsets = append(statefulsets, sts)
}
}
resourceDebugInfo := map[string]string{}
dr, dnr := deploymentsReady(cs, deployments, resourceDebugInfo)
dsr, dsnr := daemonsetsReady(daemonsets)
stsr, stsnr := statefulsetsReady(statefulsets)
nsr, nnr := namespacesReady(namespaces)
pr, pnr := podsReady(pods)
isReady := dr && nsr && dsr && stsr && pr
notReady := append(append(append(append(nnr, dnr...), pnr...), dsnr...), stsnr...)
if !isReady {
l.ReportWaiting(notReady)
}
return isReady, notReady, resourceDebugInfo, nil
}
func waitForCRDs(objects object.K8sObjects, client kube.Client) error {
var crdNames []string
for _, o := range object.KindObjects(objects, name.CRDStr) {
crdNames = append(crdNames, o.Name)
}
if len(crdNames) == 0 {
return nil
}
errPoll := wait.Poll(cRDPollInterval, cRDPollTimeout, func() (bool, error) {
descriptor:
for _, crdName := range crdNames {
crd, errGet := client.Ext().ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), crdName, metav1.GetOptions{})
if errGet != nil {
return false, errGet
}
for _, cond := range crd.Status.Conditions {
switch cond.Type {
case apiextensions.Established:
if cond.Status == apiextensions.ConditionTrue {
scope.Infof("established CRD %s", crdName)
continue descriptor
}
case apiextensions.NamesAccepted:
if cond.Status == apiextensions.ConditionFalse {
scope.Warnf("name conflict for %v: %v", crdName, cond.Reason)
}
}
}
scope.Infof("missing status condition for %q", crdName)
return false, nil
}
return true, nil
})
if errPoll != nil {
scope.Errorf("failed to verify CRD creation; %s", errPoll)
return fmt.Errorf("failed to verify CRD creation: %s", errPoll)
}
scope.Info("Finished applying CRDs.")
return nil
}
func getPods(client kubernetes.Interface, namespace string, selector labels.Selector) ([]corev1.Pod, error) {
list, err := client.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: selector.String(),
})
return list.Items, err
}
func namespacesReady(namespaces []corev1.Namespace) (bool, []string) {
var notReady []string
for _, namespace := range namespaces {
if namespace.Status.Phase != corev1.NamespaceActive {
notReady = append(notReady, "Namespace/"+namespace.Name)
}
}
return len(notReady) == 0, notReady
}
func podsReady(pods []corev1.Pod) (bool, []string) {
var notReady []string
for _, pod := range pods {
if !isPodReady(&pod) {
notReady = append(notReady, "Pod/"+pod.Namespace+"/"+pod.Name)
}
}
return len(notReady) == 0, notReady
}
func isPodReady(pod *corev1.Pod) bool {
if len(pod.Status.Conditions) > 0 {
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady &&
condition.Status == corev1.ConditionTrue {
return true
}
}
}
return false
}
func deploymentsReady(cs kubernetes.Interface, deployments []deployment, info map[string]string) (bool, []string) {
var notReady []string
for _, v := range deployments {
if v.replicaSets.Status.ReadyReplicas >= *v.deployment.Spec.Replicas {
// Ready
continue
}
id := "Deployment/" + v.deployment.Namespace + "/" + v.deployment.Name
notReady = append(notReady, id)
failure := extractPodFailureReason(cs, v.deployment.Namespace, v.deployment.Spec.Selector)
if failure != "" {
info[id] = failure
}
}
return len(notReady) == 0, notReady
}
func extractPodFailureReason(client kubernetes.Interface, namespace string, selector *metav1.LabelSelector) string {
sel, err := metav1.LabelSelectorAsSelector(selector)
if err != nil {
return fmt.Sprintf("failed to get label selector: %v", err)
}
pods, err := getPods(client, namespace, sel)
if err != nil {
return fmt.Sprintf("failed to fetch pods: %v", err)
}
sort.Slice(pods, func(i, j int) bool {
return pods[i].CreationTimestamp.After(pods[j].CreationTimestamp.Time)
})
for _, pod := range pods {
for _, cs := range pod.Status.ContainerStatuses {
if cs.State.Waiting != nil {
return fmt.Sprintf("container failed to start: %v: %v", cs.State.Waiting.Reason, cs.State.Waiting.Message)
}
}
if c := getCondition(pod.Status.Conditions, corev1.PodReady); c != nil && c.Status == corev1.ConditionFalse {
return fmt.Sprintf(c.Message)
}
}
return ""
}
func getCondition(conditions []corev1.PodCondition, condition corev1.PodConditionType) *corev1.PodCondition {
for _, cond := range conditions {
if cond.Type == condition {
return &cond
}
}
return nil
}
func daemonsetsReady(daemonsets []*appsv1.DaemonSet) (bool, []string) {
var notReady []string
for _, ds := range daemonsets {
// Check if the wanting generation is same as the observed generation
// Only when the observed generation is the same as the generation,
// other checks will make sense. If not the same, daemon set is not
// ready
if ds.Status.ObservedGeneration != ds.Generation {
scope.Infof("DaemonSet is not ready: %s/%s. Observed generation: %d expected generation: %d",
ds.Namespace, ds.Name, ds.Status.ObservedGeneration, ds.Generation)
notReady = append(notReady, "DaemonSet/"+ds.Namespace+"/"+ds.Name)
} else {
// Make sure all the updated pods have been scheduled
if ds.Spec.UpdateStrategy.Type == appsv1.OnDeleteDaemonSetStrategyType &&
ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled {
scope.Infof("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled",
ds.Namespace, ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled)
notReady = append(notReady, "DaemonSet/"+ds.Namespace+"/"+ds.Name)
}
if ds.Spec.UpdateStrategy.Type == appsv1.RollingUpdateDaemonSetStrategyType {
if ds.Status.DesiredNumberScheduled <= 0 {
// If DesiredNumberScheduled less then or equal 0, there some cases:
// 1) daemenset is just created
// 2) daemonset desired no pod
// 3) somebody changed it manually
// All the case is not a ready signal
scope.Infof("DaemonSet is not ready: %s/%s. Initializing, no pods is running",
ds.Namespace, ds.Name)
notReady = append(notReady, "DaemonSet/"+ds.Namespace+"/"+ds.Name)
} else if ds.Status.NumberReady < ds.Status.DesiredNumberScheduled {
// Make sure every node has a ready pod
scope.Infof("DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready",
ds.Namespace, ds.Name, ds.Status.NumberReady, ds.Status.UpdatedNumberScheduled)
notReady = append(notReady, "DaemonSet/"+ds.Namespace+"/"+ds.Name)
}
}
}
}
return len(notReady) == 0, notReady
}
func statefulsetsReady(statefulsets []*appsv1.StatefulSet) (bool, []string) {
var notReady []string
for _, sts := range statefulsets {
// Make sure all the updated pods have been scheduled
if sts.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType &&
sts.Status.UpdatedReplicas != sts.Status.Replicas {
scope.Infof("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled",
sts.Namespace, sts.Name, sts.Status.UpdatedReplicas, sts.Status.Replicas)
notReady = append(notReady, "StatefulSet/"+sts.Namespace+"/"+sts.Name)
}
if sts.Spec.UpdateStrategy.Type == appsv1.RollingUpdateStatefulSetStrategyType {
// Dereference all the pointers because StatefulSets like them
var partition int
// default replicasfor sts is 1
replicas := 1
// the rollingUpdate field can be nil even if the update strategy is a rolling update.
if sts.Spec.UpdateStrategy.RollingUpdate != nil &&
sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
partition = int(*sts.Spec.UpdateStrategy.RollingUpdate.Partition)
}
if sts.Spec.Replicas != nil {
replicas = int(*sts.Spec.Replicas)
}
expectedReplicas := replicas - partition
// Make sure all the updated pods have been scheduled
if int(sts.Status.UpdatedReplicas) != expectedReplicas {
scope.Infof("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled",
sts.Namespace, sts.Name, sts.Status.UpdatedReplicas, expectedReplicas)
notReady = append(notReady, "StatefulSet/"+sts.Namespace+"/"+sts.Name)
continue
}
}
}
return len(notReady) == 0, notReady
}