blob: baaa55b2116681d386f98dc53068674d36b0f99a [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package helmreconciler
import (
"context"
"fmt"
"os"
"strings"
"sync"
"time"
)
import (
"istio.io/api/label"
"istio.io/api/operator/v1alpha1"
"istio.io/pkg/version"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
)
import (
"github.com/apache/dubbo-go-pixiu/istioctl/pkg/util/formatting"
istioV1Alpha1 "github.com/apache/dubbo-go-pixiu/operator/pkg/apis/istio/v1alpha1"
"github.com/apache/dubbo-go-pixiu/operator/pkg/metrics"
"github.com/apache/dubbo-go-pixiu/operator/pkg/name"
"github.com/apache/dubbo-go-pixiu/operator/pkg/object"
"github.com/apache/dubbo-go-pixiu/operator/pkg/util"
"github.com/apache/dubbo-go-pixiu/operator/pkg/util/clog"
"github.com/apache/dubbo-go-pixiu/operator/pkg/util/progress"
"github.com/apache/dubbo-go-pixiu/pkg/config/analysis"
"github.com/apache/dubbo-go-pixiu/pkg/config/analysis/analyzers/webhook"
"github.com/apache/dubbo-go-pixiu/pkg/config/analysis/local"
"github.com/apache/dubbo-go-pixiu/pkg/config/constants"
"github.com/apache/dubbo-go-pixiu/pkg/config/resource"
"github.com/apache/dubbo-go-pixiu/pkg/kube"
)
// HelmReconciler reconciles resources rendered by a set of helm charts.
type HelmReconciler struct {
client client.Client
kubeClient kube.Client
iop *istioV1Alpha1.IstioOperator
opts *Options
// copy of the last generated manifests.
manifests name.ManifestMap
// dependencyWaitCh is a map of signaling channels. A parent with children ch1...chN will signal
// dependencyWaitCh[ch1]...dependencyWaitCh[chN] when it's completely installed.
dependencyWaitCh map[name.ComponentName]chan struct{}
// The fields below are for metrics and reporting
countLock *sync.Mutex
prunedKindSet map[schema.GroupKind]struct{}
}
// Options are options for HelmReconciler.
type Options struct {
// DryRun executes all actions but does not write anything to the cluster.
DryRun bool
// Log is a console logger for user visible CLI output.
Log clog.Logger
// Wait determines if we will wait for resources to be fully applied. Only applies to components that have no
// dependencies.
Wait bool
// WaitTimeout controls the amount of time to wait for resources in a component to become ready before giving up.
WaitTimeout time.Duration
// Log tracks the installation progress for all components.
ProgressLog *progress.Log
// Force ignores validation errors
Force bool
// SkipPrune will skip pruning
SkipPrune bool
}
var defaultOptions = &Options{
Log: clog.NewDefaultLogger(),
ProgressLog: progress.NewLog(),
}
// NewHelmReconciler creates a HelmReconciler and returns a ptr to it
func NewHelmReconciler(client client.Client, kubeClient kube.Client, iop *istioV1Alpha1.IstioOperator, opts *Options) (*HelmReconciler, error) {
if opts == nil {
opts = defaultOptions
}
if opts.ProgressLog == nil {
opts.ProgressLog = progress.NewLog()
}
if int64(opts.WaitTimeout) == 0 {
if waitForResourcesTimeoutStr, found := os.LookupEnv("WAIT_FOR_RESOURCES_TIMEOUT"); found {
if waitForResourcesTimeout, err := time.ParseDuration(waitForResourcesTimeoutStr); err == nil {
opts.WaitTimeout = waitForResourcesTimeout
} else {
scope.Warnf("invalid env variable value: %s for 'WAIT_FOR_RESOURCES_TIMEOUT'! falling back to default value...", waitForResourcesTimeoutStr)
// fallback to default wait resource timeout
opts.WaitTimeout = defaultWaitResourceTimeout
}
} else {
// fallback to default wait resource timeout
opts.WaitTimeout = defaultWaitResourceTimeout
}
}
if iop == nil {
// allows controller code to function for cases where IOP is not provided (e.g. operator remove).
iop = &istioV1Alpha1.IstioOperator{}
iop.Spec = &v1alpha1.IstioOperatorSpec{}
}
return &HelmReconciler{
client: client,
kubeClient: kubeClient,
iop: iop,
opts: opts,
dependencyWaitCh: initDependencies(),
countLock: &sync.Mutex{},
prunedKindSet: make(map[schema.GroupKind]struct{}),
}, nil
}
// initDependencies initializes the dependencies channel tree.
func initDependencies() map[name.ComponentName]chan struct{} {
ret := make(map[name.ComponentName]chan struct{})
for _, parent := range ComponentDependencies {
for _, child := range parent {
ret[child] = make(chan struct{}, 1)
}
}
return ret
}
// Reconcile reconciles the associated resources.
func (h *HelmReconciler) Reconcile() (*v1alpha1.InstallStatus, error) {
if err := h.createNamespace(istioV1Alpha1.Namespace(h.iop.Spec), h.networkName()); err != nil {
return nil, err
}
manifestMap, err := h.RenderCharts()
if err != nil {
return nil, err
}
err = h.analyzeWebhooks(manifestMap[name.PilotComponentName])
if err != nil {
if h.opts.Force {
scope.Error("invalid webhook configs; continuing because of --force")
} else {
return nil, err
}
}
status := h.processRecursive(manifestMap)
var pruneErr error
if !h.opts.SkipPrune {
h.opts.ProgressLog.SetState(progress.StatePruning)
pruneErr = h.Prune(manifestMap, false)
h.reportPrunedObjectKind()
}
return status, pruneErr
}
// processRecursive processes the given manifests in an order of dependencies defined in h. Dependencies are a tree,
// where a child must wait for the parent to complete before starting.
func (h *HelmReconciler) processRecursive(manifests name.ManifestMap) *v1alpha1.InstallStatus {
componentStatus := make(map[string]*v1alpha1.InstallStatus_VersionStatus)
// mu protects the shared InstallStatus componentStatus across goroutines
var mu sync.Mutex
// wg waits for all manifest processing goroutines to finish
var wg sync.WaitGroup
serverSideApply := h.CheckSSAEnabled()
for c, ms := range manifests {
c, ms := c, ms
wg.Add(1)
go func() {
var processedObjs object.K8sObjects
var deployedObjects int
defer wg.Done()
if s := h.dependencyWaitCh[c]; s != nil {
scope.Infof("%s is waiting on dependency...", c)
<-s
scope.Infof("Dependency for %s has completed, proceeding.", c)
}
// Possible paths for status are RECONCILING -> {NONE, ERROR, HEALTHY}. NONE means component has no resources.
// In NONE case, the component is not shown in overall status.
mu.Lock()
setStatus(componentStatus, c, v1alpha1.InstallStatus_RECONCILING, nil)
mu.Unlock()
status := v1alpha1.InstallStatus_NONE
var err error
if len(ms) != 0 {
m := name.Manifest{
Name: c,
Content: name.MergeManifestSlices(ms),
}
processedObjs, deployedObjects, err = h.ApplyManifest(m, serverSideApply)
if err != nil {
status = v1alpha1.InstallStatus_ERROR
} else if len(processedObjs) != 0 || deployedObjects > 0 {
status = v1alpha1.InstallStatus_HEALTHY
}
}
mu.Lock()
setStatus(componentStatus, c, status, err)
mu.Unlock()
// Signal all the components that depend on us.
for _, ch := range ComponentDependencies[c] {
scope.Infof("Unblocking dependency %s.", ch)
h.dependencyWaitCh[ch] <- struct{}{}
}
}()
}
wg.Wait()
metrics.ReportOwnedResourceCounts()
out := &v1alpha1.InstallStatus{
Status: overallStatus(componentStatus),
ComponentStatus: componentStatus,
}
return out
}
// CheckSSAEnabled is a helper function to check whether ServerSideApply should be used when applying manifests.
func (h *HelmReconciler) CheckSSAEnabled() bool {
if h.kubeClient != nil {
// There is a mutatingwebhook in gke that would corrupt the managedFields, which is fixed in k8s 1.18.
// See: https://github.com/kubernetes/kubernetes/issues/96351
if kube.IsAtLeastVersion(h.kubeClient, 18) {
// todo(kebe7jun) a more general test method
// API Server does not support detecting whether ServerSideApply is enabled
// through the API for the time being.
ns, err := h.kubeClient.Kube().CoreV1().Namespaces().Get(context.TODO(), constants.KubeSystemNamespace, v12.GetOptions{})
if err != nil {
scope.Warnf("failed to get namespace: %v", err)
return false
}
if ns.ManagedFields == nil {
scope.Infof("k8s support ServerSideApply but was manually disabled")
return false
}
return true
}
}
return false
}
// Delete resources associated with the custom resource instance
func (h *HelmReconciler) Delete() error {
defer func() {
metrics.ReportOwnedResourceCounts()
h.reportPrunedObjectKind()
}()
iop := h.iop
if iop.Spec.Revision == "" {
err := h.Prune(nil, true)
return err
}
// Delete IOP with revision:
// for this case we update the status field to pending if there are still proxies pointing to this revision
// and we do not prune shared resources, same effect as `istioctl uninstall --revision foo` command.
status, err := h.PruneControlPlaneByRevisionWithController(iop.Spec)
if err != nil {
return err
}
// check status here because terminating iop's status can't be updated.
if status.Status == v1alpha1.InstallStatus_ACTION_REQUIRED {
return fmt.Errorf("action is required before deleting the iop instance: %s", status.Message)
}
// updating status taking no effect for terminating resources.
if err := h.SetStatusComplete(status); err != nil {
return err
}
return nil
}
// DeleteAll deletes all Istio resources in the cluster.
func (h *HelmReconciler) DeleteAll() error {
manifestMap := name.ManifestMap{}
for _, c := range name.AllComponentNames {
manifestMap[c] = nil
}
return h.Prune(manifestMap, true)
}
// SetStatusBegin updates the status field on the IstioOperator instance before reconciling.
func (h *HelmReconciler) SetStatusBegin() error {
isop := &istioV1Alpha1.IstioOperator{}
namespacedName := types.NamespacedName{
Name: h.iop.Name,
Namespace: h.iop.Namespace,
}
if err := h.getClient().Get(context.TODO(), namespacedName, isop); err != nil {
if runtime.IsNotRegisteredError(err) {
// CRD not yet installed in cluster, nothing to update.
return nil
}
return fmt.Errorf("failed to get IstioOperator before updating status due to %v", err)
}
if isop.Status == nil {
isop.Status = &v1alpha1.InstallStatus{Status: v1alpha1.InstallStatus_RECONCILING}
} else {
cs := isop.Status.ComponentStatus
for cn := range cs {
cs[cn] = &v1alpha1.InstallStatus_VersionStatus{
Status: v1alpha1.InstallStatus_RECONCILING,
}
}
isop.Status.Status = v1alpha1.InstallStatus_RECONCILING
}
return h.getClient().Status().Update(context.TODO(), isop)
}
// SetStatusComplete updates the status field on the IstioOperator instance based on the resulting err parameter.
func (h *HelmReconciler) SetStatusComplete(status *v1alpha1.InstallStatus) error {
iop := &istioV1Alpha1.IstioOperator{}
namespacedName := types.NamespacedName{
Name: h.iop.Name,
Namespace: h.iop.Namespace,
}
if err := h.getClient().Get(context.TODO(), namespacedName, iop); err != nil {
return fmt.Errorf("failed to get IstioOperator before updating status due to %v", err)
}
iop.Status = status
return h.getClient().Status().Update(context.TODO(), iop)
}
// setStatus sets the status for the component with the given name, which is a key in the given map.
// If the status is InstallStatus_NONE, the component name is deleted from the map.
// Otherwise, if the map key/value is missing, one is created.
func setStatus(s map[string]*v1alpha1.InstallStatus_VersionStatus, componentName name.ComponentName, status v1alpha1.InstallStatus_Status, err error) {
cn := string(componentName)
if status == v1alpha1.InstallStatus_NONE {
delete(s, cn)
return
}
if _, ok := s[cn]; !ok {
s[cn] = &v1alpha1.InstallStatus_VersionStatus{}
}
s[cn].Status = status
if err != nil {
s[cn].Error = err.Error()
}
}
// overallStatus returns the summary status over all components.
// - If all components are HEALTHY, overall status is HEALTHY.
// - If one or more components are RECONCILING and others are HEALTHY, overall status is RECONCILING.
// - If one or more components are UPDATING and others are HEALTHY, overall status is UPDATING.
// - If components are a mix of RECONCILING, UPDATING and HEALTHY, overall status is UPDATING.
// - If any component is in ERROR state, overall status is ERROR.
func overallStatus(componentStatus map[string]*v1alpha1.InstallStatus_VersionStatus) v1alpha1.InstallStatus_Status {
ret := v1alpha1.InstallStatus_HEALTHY
for _, cs := range componentStatus {
if cs.Status == v1alpha1.InstallStatus_ERROR {
ret = v1alpha1.InstallStatus_ERROR
break
} else if cs.Status == v1alpha1.InstallStatus_UPDATING {
ret = v1alpha1.InstallStatus_UPDATING
break
} else if cs.Status == v1alpha1.InstallStatus_RECONCILING {
ret = v1alpha1.InstallStatus_RECONCILING
break
}
}
return ret
}
// getCoreOwnerLabels returns a map of labels for associating installation resources. This is the common
// labels shared between all resources; see getOwnerLabels to get labels per-component labels
func (h *HelmReconciler) getCoreOwnerLabels() (map[string]string, error) {
crName, err := h.getCRName()
if err != nil {
return nil, err
}
crNamespace, err := h.getCRNamespace()
if err != nil {
return nil, err
}
labels := make(map[string]string)
labels[operatorLabelStr] = operatorReconcileStr
if crName != "" {
labels[OwningResourceName] = crName
}
if crNamespace != "" {
labels[OwningResourceNamespace] = crNamespace
}
labels[istioVersionLabelStr] = version.Info.Version
revision := ""
if h.iop != nil {
revision = h.iop.Spec.Revision
}
if revision == "" {
revision = "default"
}
labels[label.IoIstioRev.Name] = revision
return labels, nil
}
func (h *HelmReconciler) addComponentLabels(coreLabels map[string]string, componentName string) map[string]string {
labels := map[string]string{}
for k, v := range coreLabels {
labels[k] = v
}
labels[IstioComponentLabelStr] = componentName
return labels
}
// getOwnerLabels returns a map of labels for the given component name, revision and owning CR resource name.
func (h *HelmReconciler) getOwnerLabels(componentName string) (map[string]string, error) {
labels, err := h.getCoreOwnerLabels()
if err != nil {
return nil, err
}
return h.addComponentLabels(labels, componentName), nil
}
// applyLabelsAndAnnotations applies owner labels and annotations to the object.
func (h *HelmReconciler) applyLabelsAndAnnotations(obj runtime.Object, componentName string) error {
labels, err := h.getOwnerLabels(componentName)
if err != nil {
return err
}
for k, v := range labels {
err := util.SetLabel(obj, k, v)
if err != nil {
return err
}
}
return nil
}
// getCRName returns the name of the CR associated with h.
func (h *HelmReconciler) getCRName() (string, error) {
if h.iop == nil {
return "", nil
}
objAccessor, err := meta.Accessor(h.iop)
if err != nil {
return "", err
}
return objAccessor.GetName(), nil
}
// getCRHash returns the cluster unique hash of the CR associated with h.
func (h *HelmReconciler) getCRHash(componentName string) (string, error) {
crName, err := h.getCRName()
if err != nil {
return "", err
}
crNamespace, err := h.getCRNamespace()
if err != nil {
return "", err
}
var host string
if h.kubeClient != nil && h.kubeClient.RESTConfig() != nil {
host = h.kubeClient.RESTConfig().Host
}
return strings.Join([]string{crName, crNamespace, componentName, host}, "-"), nil
}
// getCRNamespace returns the namespace of the CR associated with h.
func (h *HelmReconciler) getCRNamespace() (string, error) {
if h.iop == nil {
return "", nil
}
objAccessor, err := meta.Accessor(h.iop)
if err != nil {
return "", err
}
return objAccessor.GetNamespace(), nil
}
// getClient returns the kubernetes client associated with this HelmReconciler
func (h *HelmReconciler) getClient() client.Client {
return h.client
}
func (h *HelmReconciler) addPrunedKind(gk schema.GroupKind) {
h.countLock.Lock()
defer h.countLock.Unlock()
h.prunedKindSet[gk] = struct{}{}
}
func (h *HelmReconciler) reportPrunedObjectKind() {
h.countLock.Lock()
defer h.countLock.Unlock()
for gvk := range h.prunedKindSet {
metrics.ResourcePruneTotal.
With(metrics.ResourceKindLabel.Value(util.GKString(gvk))).
Increment()
}
}
// CreateNamespace creates a namespace using the given k8s interface.
func CreateNamespace(cs kubernetes.Interface, namespace string, network string, dryRun bool) error {
if dryRun {
scope.Infof("Not applying Namespace %s because of dry run.", namespace)
return nil
}
if namespace == "" {
// Setup default namespace
namespace = name.IstioDefaultNamespace
}
// check if the namespace already exists. If yes, do nothing. If no, create a new one.
_, err := cs.CoreV1().Namespaces().Get(context.TODO(), namespace, v12.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
ns := &v1.Namespace{ObjectMeta: v12.ObjectMeta{
Name: namespace,
Labels: map[string]string{},
}}
if network != "" {
ns.Labels[label.TopologyNetwork.Name] = network
}
_, err := cs.CoreV1().Namespaces().Create(context.TODO(), ns, v12.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create namespace %v: %v", namespace, err)
}
} else {
return fmt.Errorf("failed to check if namespace %v exists: %v", namespace, err)
}
}
return nil
}
func (h *HelmReconciler) analyzeWebhooks(whs []string) error {
if len(whs) == 0 {
return nil
}
sa := local.NewSourceAnalyzer(analysis.Combine("webhook", &webhook.Analyzer{
SkipServiceCheck: true,
}),
resource.Namespace(h.iop.Spec.GetNamespace()), resource.Namespace(istioV1Alpha1.Namespace(h.iop.Spec)), nil, true, 30*time.Second)
var localWebhookYAMLReaders []local.ReaderSource
var parsedK8sObjects object.K8sObjects
for _, wh := range whs {
k8sObjects, err := object.ParseK8sObjectsFromYAMLManifest(wh)
if err != nil {
return err
}
objYaml, err := k8sObjects.YAMLManifest()
if err != nil {
return err
}
whReaderSource := local.ReaderSource{
Name: "",
Reader: strings.NewReader(objYaml),
}
localWebhookYAMLReaders = append(localWebhookYAMLReaders, whReaderSource)
parsedK8sObjects = append(parsedK8sObjects, k8sObjects...)
}
err := sa.AddReaderKubeSource(localWebhookYAMLReaders)
if err != nil {
return err
}
if h.kubeClient != nil {
sa.AddRunningKubeSource(h.kubeClient)
}
// Analyze webhooks
res, err := sa.Analyze(make(chan struct{}))
if err != nil {
return err
}
relevantMessages := res.Messages.FilterOutBasedOnResources(parsedK8sObjects)
if len(relevantMessages) > 0 {
o, err := formatting.Print(relevantMessages, formatting.LogFormat, false)
if err != nil {
return err
}
return fmt.Errorf("creating default tag would conflict:\n%v", o)
}
return nil
}
// createNamespace creates a namespace using the given k8s client.
func (h *HelmReconciler) createNamespace(namespace string, network string) error {
return CreateNamespace(h.kubeClient, namespace, network, h.opts.DryRun)
}
func (h *HelmReconciler) networkName() string {
if h.iop.Spec.GetValues() == nil {
return ""
}
globalI := h.iop.Spec.Values.AsMap()["global"]
global, ok := globalI.(map[string]interface{})
if !ok {
return ""
}
nw, ok := global["network"].(string)
if !ok {
return ""
}
return nw
}