blob: 2b5201827952771efa26e3f7fad6ba1b125c9859 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controllers
import (
"context"
)
import (
"github.com/go-logr/logr"
"github.com/pkg/errors"
kube_core "k8s.io/api/core/v1"
kube_apierrs "k8s.io/apimachinery/pkg/api/errors"
kube_meta "k8s.io/apimachinery/pkg/apis/meta/v1"
kube_runtime "k8s.io/apimachinery/pkg/runtime"
kube_types "k8s.io/apimachinery/pkg/types"
kube_record "k8s.io/client-go/tools/record"
kube_ctrl "sigs.k8s.io/controller-runtime"
kube_client "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
kube_controllerutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
kube_handler "sigs.k8s.io/controller-runtime/pkg/handler"
kube_reconcile "sigs.k8s.io/controller-runtime/pkg/reconcile"
)
import (
core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
k8s_common "github.com/apache/dubbo-kubernetes/pkg/plugins/common/k8s"
mesh_k8s "github.com/apache/dubbo-kubernetes/pkg/plugins/resources/k8s/native/api/v1alpha1"
k8s_model "github.com/apache/dubbo-kubernetes/pkg/plugins/resources/k8s/native/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/plugins/runtime/k8s/metadata"
util_k8s "github.com/apache/dubbo-kubernetes/pkg/plugins/runtime/k8s/util"
)
const (
CreatedDubboDataplaneReason = "CreatedDubboDataplane"
UpdatedDubboDataplaneReason = "UpdatedDubboDataplane"
FailedToGenerateDubboDataplaneReason = "FailedToGenerateDubboDataplane"
)
// PodReconciler reconciles a Pod object
type PodReconciler struct {
kube_client.Client
kube_record.EventRecorder
Scheme *kube_runtime.Scheme
Log logr.Logger
PodConverter PodConverter
ResourceConverter k8s_common.Converter
SystemNamespace string
IgnoredServiceSelectorLabels []string
}
func (r *PodReconciler) Reconcile(ctx context.Context, req kube_ctrl.Request) (kube_ctrl.Result, error) {
log := r.Log.WithValues("pod", req.NamespacedName)
log.V(1).Info("reconcile")
// Fetch the Pod instance
pod := &kube_core.Pod{}
if err := r.Get(ctx, req.NamespacedName, pod); err != nil {
if kube_apierrs.IsNotFound(err) {
log.V(1).Info("pod not found. Skipping")
return kube_ctrl.Result{}, nil
}
log.Error(err, "unable to fetch Pod")
return kube_ctrl.Result{}, err
}
// for Pods marked with ingress annotation special type of Dataplane will be injected
enabled, exist, err := metadata.Annotations(pod.Annotations).GetEnabled(metadata.DubboIngressAnnotation)
if err != nil {
return kube_ctrl.Result{}, err
}
if exist && enabled {
return kube_ctrl.Result{}, r.reconcileZoneIngress(ctx, pod, log)
}
// for Pods marked with egress annotation special type of Dataplane will be injected
egressEnabled, egressExist, err := metadata.Annotations(pod.Annotations).GetEnabled(metadata.DubboEgressAnnotation)
if err != nil {
return kube_ctrl.Result{}, err
}
if egressExist && egressEnabled {
return kube_ctrl.Result{}, r.reconcileZoneEgress(ctx, pod, log)
}
xdsEnable, xdsExist, err := metadata.Annotations(pod.Annotations).GetEnabled(metadata.DubboXdsEnableAnnotation)
if err != nil {
return kube_ctrl.Result{}, err
}
if xdsExist && xdsEnable {
return kube_ctrl.Result{}, r.reconcileDataplane(ctx, pod, log)
}
return kube_ctrl.Result{}, nil
}
func (r *PodReconciler) reconcileDataplane(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error {
ns := kube_core.Namespace{}
if err := r.Client.Get(ctx, kube_types.NamespacedName{Name: pod.Namespace}, &ns); err != nil {
return errors.Wrap(err, "unable to get Namespace for Pod")
}
if ns.Status.Phase == kube_core.NamespaceTerminating {
r.Log.V(1).Info("namespace is terminating. Ignoring reconciliation")
return nil
}
dp := &mesh_k8s.Dataplane{
ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace},
}
if pod.Status.Phase == kube_core.PodSucceeded {
// Remove Dataplane object for Pods that are indefinitely in Succeeded phase, i.e. Jobs
return r.deleteObjectIfExist(ctx, dp, "pod succeeded", log)
}
if pod.Status.PodIP == "" {
return r.deleteObjectIfExist(ctx, dp, "pod IP is empty", log)
}
if pod.Status.Reason == "Evicted" {
return r.deleteObjectIfExist(ctx, dp, "pod was evicted", log)
}
services, err := r.findMatchingServices(ctx, pod)
if err != nil {
return err
}
var others []*mesh_k8s.Dataplane
// we don't need other Dataplane objects when outbounds are stored in ConfigMap
others, err = r.findOtherDataplanes(ctx, pod, &ns)
if err != nil {
return err
}
if err := r.createOrUpdateDataplane(ctx, pod, &ns, services, others); err != nil {
return err
}
return nil
}
func (r *PodReconciler) deleteObjectIfExist(ctx context.Context, object k8s_model.KubernetesObject, cause string, log logr.Logger) error {
log = log.WithValues(
"cause", cause,
"kind", object.GetObjectKind(),
"name", object.GetName(),
"namespace", object.GetNamespace(),
)
if err := r.Client.Delete(ctx, object); err != nil {
if kube_apierrs.IsNotFound(err) {
log.V(1).Info("Object is not found, nothing to delete")
return nil
}
return errors.Wrapf(err, "could not delete %v %s/%s", object.GetObjectKind(), object.GetName(), object.GetNamespace())
}
log.Info("Object deleted")
return nil
}
func (r *PodReconciler) reconcileZoneIngress(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error {
if pod.Status.PodIP == "" {
zi := &mesh_k8s.ZoneIngress{
ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace},
}
return r.deleteObjectIfExist(ctx, zi, "pod IP is empty", log)
}
if pod.Namespace != r.SystemNamespace {
return errors.Errorf("Ingress can only be deployed in system namespace %q", r.SystemNamespace)
}
services, err := r.findMatchingServices(ctx, pod)
if err != nil {
return err
}
err = r.createOrUpdateIngress(ctx, pod, services)
if err != nil {
return err
}
return nil
}
func (r *PodReconciler) reconcileZoneEgress(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error {
if pod.Status.PodIP == "" {
zi := &mesh_k8s.ZoneEgress{
ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace},
}
return r.deleteObjectIfExist(ctx, zi, "pod IP is empty", log)
}
if pod.Namespace != r.SystemNamespace {
return errors.Errorf("Egress can only be deployed in system namespace %q", r.SystemNamespace)
}
services, err := r.findMatchingServices(ctx, pod)
if err != nil {
return err
}
err = r.createOrUpdateEgress(ctx, pod, services)
if err != nil {
return err
}
return nil
}
func (r *PodReconciler) findMatchingServices(ctx context.Context, pod *kube_core.Pod) ([]*kube_core.Service, error) {
// List Services in the same Namespace
allServices := &kube_core.ServiceList{}
if err := r.List(ctx, allServices, kube_client.InNamespace(pod.Namespace)); err != nil {
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
log.Error(err, "unable to list Services", "namespace", pod.Namespace)
return nil, err
}
// only consider Services that match this Pod
matchingServices := util_k8s.FindServices(allServices, util_k8s.AnySelector(), util_k8s.MatchServiceThatSelectsPod(pod, r.IgnoredServiceSelectorLabels))
return matchingServices, nil
}
func (r *PodReconciler) findOtherDataplanes(ctx context.Context, pod *kube_core.Pod, ns *kube_core.Namespace) ([]*mesh_k8s.Dataplane, error) {
// List all Dataplanes
allDataplanes := &mesh_k8s.DataplaneList{}
if err := r.List(ctx, allDataplanes); err != nil {
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
log.Error(err, "unable to list Dataplanes")
return nil, err
}
// only consider Dataplanes in the same Mesh as Pod
mesh := util_k8s.MeshOfByAnnotation(pod, ns)
otherDataplanes := make([]*mesh_k8s.Dataplane, 0)
for i := range allDataplanes.Items {
dataplane := allDataplanes.Items[i]
dp := core_mesh.NewDataplaneResource()
if err := r.ResourceConverter.ToCoreResource(&dataplane, dp); err != nil {
converterLog.Error(err, "failed to parse Dataplane", "dataplane", dataplane.Spec)
continue // one invalid Dataplane definition should not break the entire mesh
}
if dataplane.Mesh == mesh {
otherDataplanes = append(otherDataplanes, &dataplane)
}
}
return otherDataplanes, nil
}
func (r *PodReconciler) createOrUpdateDataplane(
ctx context.Context,
pod *kube_core.Pod,
ns *kube_core.Namespace,
services []*kube_core.Service,
others []*mesh_k8s.Dataplane,
) error {
dataplane := &mesh_k8s.Dataplane{
ObjectMeta: kube_meta.ObjectMeta{
Namespace: pod.Namespace,
Name: pod.Name,
},
}
operationResult, err := kube_controllerutil.CreateOrUpdate(ctx, r.Client, dataplane, func() error {
if err := r.PodConverter.PodToDataplane(ctx, dataplane, pod, ns, services, others); err != nil {
return errors.Wrap(err, "unable to translate a Pod into a Dataplane")
}
if err := kube_controllerutil.SetControllerReference(pod, dataplane, r.Scheme); err != nil {
return errors.Wrap(err, "unable to set Dataplane's controller reference to Pod")
}
return nil
})
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
if err != nil {
if !errors.Is(err, context.Canceled) {
log.Error(err, "unable to create/update Dataplane", "operationResult", operationResult)
r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateDubboDataplaneReason, "Failed to generate Dubbo Dataplane: %s", err.Error())
}
return err
}
switch operationResult {
case kube_controllerutil.OperationResultCreated:
log.Info("Dataplane created")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedDubboDataplaneReason, "Created Dubbo Dataplane: %s", pod.Name)
case kube_controllerutil.OperationResultUpdated:
log.Info("Dataplane updated")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedDubboDataplaneReason, "Updated Dubbo Dataplane: %s", pod.Name)
}
return nil
}
func (r *PodReconciler) createOrUpdateIngress(ctx context.Context, pod *kube_core.Pod, services []*kube_core.Service) error {
ingress := &mesh_k8s.ZoneIngress{
ObjectMeta: kube_meta.ObjectMeta{
Namespace: pod.Namespace,
Name: pod.Name,
},
Mesh: model.NoMesh,
}
operationResult, err := kube_controllerutil.CreateOrUpdate(ctx, r.Client, ingress, func() error {
if err := r.PodConverter.PodToIngress(ctx, ingress, pod, services); err != nil {
return errors.Wrap(err, "unable to translate a Pod into a Ingress")
}
if err := kube_controllerutil.SetControllerReference(pod, ingress, r.Scheme); err != nil {
return errors.Wrap(err, "unable to set Ingress's controller reference to Pod")
}
return nil
})
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
if err != nil {
log.Error(err, "unable to create/update Ingress", "operationResult", operationResult)
r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateDubboDataplaneReason, "Failed to generate Dubbo Ingress: %s", err.Error())
return err
}
switch operationResult {
case kube_controllerutil.OperationResultCreated:
log.Info("ZoneIngress created")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedDubboDataplaneReason, "Created Dubbo Ingress: %s", pod.Name)
case kube_controllerutil.OperationResultUpdated:
log.Info("ZoneIngress updated")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedDubboDataplaneReason, "Updated Dubbo Ingress: %s", pod.Name)
}
return nil
}
func (r *PodReconciler) createOrUpdateEgress(ctx context.Context, pod *kube_core.Pod, services []*kube_core.Service) error {
egress := &mesh_k8s.ZoneEgress{
ObjectMeta: kube_meta.ObjectMeta{
Namespace: pod.Namespace,
Name: pod.Name,
},
Mesh: model.NoMesh,
}
operationResult, err := kube_controllerutil.CreateOrUpdate(ctx, r.Client, egress, func() error {
if err := r.PodConverter.PodToEgress(ctx, egress, pod, services); err != nil {
return errors.Wrap(err, "unable to translate a Pod into a Egress")
}
if err := kube_controllerutil.SetControllerReference(pod, egress, r.Scheme); err != nil {
return errors.Wrap(err, "unable to set Egress's controller reference to Pod")
}
return nil
})
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
if err != nil {
log.Error(err, "unable to create/update Egress", "operationResult", operationResult)
r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateDubboDataplaneReason, "Failed to generate Dubbo Egress: %s", err.Error())
return err
}
switch operationResult {
case kube_controllerutil.OperationResultCreated:
log.Info("ZoneEgress created")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedDubboDataplaneReason, "Created Dubbo Egress: %s", pod.Name)
case kube_controllerutil.OperationResultUpdated:
log.Info("ZoneEgress updated")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedDubboDataplaneReason, "Updated Dubbo Egress: %s", pod.Name)
}
return nil
}
func (r *PodReconciler) SetupWithManager(mgr kube_ctrl.Manager, maxConcurrentReconciles int) error {
return kube_ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}).
For(&kube_core.Pod{}).
// on Service update reconcile affected Pods (all Pods selected by this service)
Watches(&kube_core.Service{}, kube_handler.EnqueueRequestsFromMapFunc(ServiceToPodsMapper(r.Log, mgr.GetClient()))).
Complete(r)
}
func ServiceToPodsMapper(l logr.Logger, client kube_client.Client) kube_handler.MapFunc {
l = l.WithName("service-to-pods-mapper")
return func(ctx context.Context, obj kube_client.Object) []kube_reconcile.Request {
// List Pods in the same namespace as a Service
pods := &kube_core.PodList{}
if err := client.List(ctx, pods, kube_client.InNamespace(obj.GetNamespace()), kube_client.MatchingLabels(obj.(*kube_core.Service).Spec.Selector)); err != nil {
l.WithValues("service", obj.GetName()).Error(err, "failed to fetch Pods")
return nil
}
var req []kube_reconcile.Request
for _, pod := range pods.Items {
req = append(req, kube_reconcile.Request{
NamespacedName: kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name},
})
}
return req
}
}