blob: 134074b99c83505bc7512809a8bae7e360689b8e [file] [log] [blame]
package inject
import (
"encoding/json"
"errors"
"fmt"
opconfig "github.com/apache/dubbo-kubernetes/operator/pkg/apis"
"github.com/apache/dubbo-kubernetes/pkg/kube"
"github.com/apache/dubbo-kubernetes/pkg/util/protomarshal"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
"gomodules.xyz/jsonpatch/v2"
meshconfig "istio.io/api/mesh/v1alpha1"
admissionv1 "k8s.io/api/admission/v1"
kubeApiAdmissionv1beta1 "k8s.io/api/admission/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"net/http"
"os"
"sigs.k8s.io/yaml"
"strings"
"sync"
"text/template"
"time"
)
var (
runtimeScheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(runtimeScheme)
deserializer = codecs.UniversalDeserializer()
)
func init() {
_ = corev1.AddToScheme(runtimeScheme)
_ = admissionv1.AddToScheme(runtimeScheme)
_ = kubeApiAdmissionv1beta1.AddToScheme(runtimeScheme)
}
const (
watchDebounceDelay = 100 * time.Millisecond
)
type Webhook struct {
mu sync.RWMutex
watcher Watcher
meshConfig *meshconfig.MeshConfig
env *model.Environment
Config *Config
valuesConfig ValuesConfig
revision string
}
func NewWebhook(p WebhookParameters) (*Webhook, error) {
if p.Mux == nil {
return nil, errors.New("expected mux to be passed, but was not passed")
}
wh := &Webhook{
watcher: p.Watcher,
meshConfig: p.Env.Mesh(),
env: p.Env,
revision: p.Revision,
}
proxylessConfig, valuesConfig, err := p.Watcher.Get()
if err != nil {
return nil, fmt.Errorf("failed to get initial configuration: %v", err)
}
if err := wh.updateConfig(proxylessConfig, valuesConfig); err != nil {
return nil, fmt.Errorf("failed to process webhook config: %v", err)
}
p.Mux.HandleFunc("/inject", wh.serveInject)
p.Mux.HandleFunc("/inject/", wh.serveInject)
p.Env.Watcher.AddMeshHandler(func() {
wh.mu.Lock()
wh.meshConfig = p.Env.Mesh()
wh.mu.Unlock()
})
return wh, nil
}
type WebhookParameters struct {
Watcher Watcher
Port int
Env *model.Environment
Mux *http.ServeMux
Revision string
}
type ValuesConfig struct {
raw string
asStruct *opconfig.Values
asMap map[string]any
}
func loadConfig(injectFile, valuesFile string) (*Config, string, error) {
data, err := os.ReadFile(injectFile)
if err != nil {
return nil, "", err
}
var c *Config
if c, err = unmarshalConfig(data); err != nil {
klog.Warningf("Failed to parse injectFile %s", string(data))
return nil, "", err
}
valuesConfig, err := os.ReadFile(valuesFile)
if err != nil {
return nil, "", err
}
return c, string(valuesConfig), nil
}
func unmarshalConfig(data []byte) (*Config, error) {
c, err := UnmarshalConfig(data)
if err != nil {
return nil, err
}
return &c, nil
}
func (wh *Webhook) updateConfig(sidecarConfig *Config, valuesConfig string) error {
wh.mu.Lock()
defer wh.mu.Unlock()
wh.Config = sidecarConfig
vc, err := NewValuesConfig(valuesConfig)
if err != nil {
return fmt.Errorf("failed to create new values config: %v", err)
}
wh.valuesConfig = vc
return nil
}
func NewValuesConfig(v string) (ValuesConfig, error) {
c := ValuesConfig{raw: v}
valuesStruct := &opconfig.Values{}
if err := protomarshal.ApplyYAML(v, valuesStruct); err != nil {
return c, fmt.Errorf("could not parse configuration values: %v", err)
}
c.asStruct = valuesStruct
values := map[string]any{}
if err := yaml.Unmarshal([]byte(v), &values); err != nil {
return c, fmt.Errorf("could not parse configuration values: %v", err)
}
c.asMap = values
return c, nil
}
func (wh *Webhook) serveInject(w http.ResponseWriter, r *http.Request) {
var body []byte
if r.Body != nil {
if data, err := kube.HTTPConfigReader(r); err == nil {
body = data
} else {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
if len(body) == 0 {
http.Error(w, "no body found", http.StatusBadRequest)
return
}
// verify the content type is accurate
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
http.Error(w, "invalid Content-Type, want `application/json`", http.StatusUnsupportedMediaType)
return
}
path := ""
if r.URL != nil {
path = r.URL.Path
}
var reviewResponse *kube.AdmissionResponse
var obj runtime.Object
var ar *kube.AdmissionReview
if out, _, err := deserializer.Decode(body, nil, obj); err != nil {
reviewResponse = toAdmissionResponse(err)
} else {
ar, err = kube.AdmissionReviewKubeToAdapter(out)
if err != nil {
reviewResponse = toAdmissionResponse(err)
} else {
reviewResponse = wh.inject(ar, path)
}
}
response := kube.AdmissionReview{}
response.Response = reviewResponse
var responseKube runtime.Object
var apiVersion string
if ar != nil {
apiVersion = ar.APIVersion
response.TypeMeta = ar.TypeMeta
if response.Response != nil {
if ar.Request != nil {
response.Response.UID = ar.Request.UID
}
}
}
responseKube = kube.AdmissionReviewAdapterToKube(&response, apiVersion)
resp, err := json.Marshal(responseKube)
if err != nil {
http.Error(w, fmt.Sprintf("could not encode response: %v", err), http.StatusInternalServerError)
return
}
if _, err := w.Write(resp); err != nil {
klog.Errorf("Could not write response: %v", err)
http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
return
}
}
type InjectionParameters struct {
pod *corev1.Pod
deployMeta types.NamespacedName
namespace *corev1.Namespace
typeMeta metav1.TypeMeta
templates map[string]*template.Template
defaultTemplate []string
aliases map[string][]string
meshConfig *meshconfig.MeshConfig
proxyConfig *meshconfig.ProxyConfig
valuesConfig ValuesConfig
revision string
proxyEnvs map[string]string
injectedAnnotations map[string]string
}
func injectPod(req InjectionParameters) ([]byte, error) {
// The patch will be built relative to the initial pod, capture its current state
originalPodSpec, err := json.Marshal(req.pod)
if err != nil {
return nil, err
}
// Run the injection template, giving us a partial pod spec
mergedPod, injectedPodData, err := RunTemplate(req)
if err != nil {
return nil, fmt.Errorf("failed to run injection template: %v", err)
}
mergedPod, err = reapplyOverwrittenContainers(mergedPod, req.pod, injectedPodData, req.proxyConfig)
if err != nil {
return nil, fmt.Errorf("failed to re apply container: %v", err)
}
// Apply some additional transformations to the pod
if err := postProcessPod(mergedPod, *injectedPodData, req); err != nil {
return nil, fmt.Errorf("failed to process pod: %v", err)
}
patch, err := createPatch(mergedPod, originalPodSpec)
if err != nil {
return nil, fmt.Errorf("failed to create patch: %v", err)
}
return patch, nil
}
func reapplyOverwrittenContainers(finalPod *corev1.Pod, originalPod *corev1.Pod, templatePod *corev1.Pod, proxyConfig *meshconfig.ProxyConfig) (*corev1.Pod, error) {
return finalPod, nil
}
func createPatch(pod *corev1.Pod, original []byte) ([]byte, error) {
reinjected, err := json.Marshal(pod)
if err != nil {
return nil, err
}
p, err := jsonpatch.CreatePatch(original, reinjected)
if err != nil {
return nil, err
}
return json.Marshal(p)
}
func postProcessPod(pod *corev1.Pod, injectedPod corev1.Pod, req InjectionParameters) error {
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
if pod.Labels == nil {
pod.Labels = map[string]string{}
}
return nil
}
func parseInjectEnvs(path string) map[string]string {
path = strings.TrimSuffix(path, "/")
res := func(path string) []string {
parts := strings.SplitN(path, "/", 3)
var newRes []string
if len(parts) == 3 { // If length is less than 3, then the path is simply "/inject".
if strings.HasPrefix(parts[2], ":ENV:") {
// Deprecated, not recommended.
// Note that this syntax fails validation when used to set injectionPath (i.e., service.path in mwh).
// It doesn't fail validation when used to set injectionURL, however. K8s bug maybe?
pairs := strings.Split(parts[2], ":ENV:")
for i := 1; i < len(pairs); i++ { // skip the first part, it is a nil
pair := strings.SplitN(pairs[i], "=", 2)
// The first part is the variable name which can not be empty
// the second part is the variable value which can be empty but has to exist
// for example, aaa=bbb, aaa= are valid, but =aaa or = are not valid, the
// invalid ones will be ignored.
if len(pair[0]) > 0 && len(pair) == 2 {
newRes = append(newRes, pair...)
}
}
return newRes
}
newRes = strings.Split(parts[2], "/")
}
for i, value := range newRes {
if i%2 != 0 {
// Replace --slash-- with / in values.
newRes[i] = strings.ReplaceAll(value, "--slash--", "/")
}
}
return newRes
}(path)
newEnvs := make(map[string]string)
for i := 0; i < len(res); i += 2 {
k := res[i]
if i == len(res)-1 { // ignore the last key without value
klog.Warningf("Odd number of inject env entries, ignore the last key %s\n", k)
break
}
}
return newEnvs
}
func (wh *Webhook) inject(ar *kube.AdmissionReview, path string) *kube.AdmissionResponse {
req := ar.Request
var pod corev1.Pod
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
return toAdmissionResponse(err)
}
pod.ManagedFields = nil
potentialPodName(pod.ObjectMeta)
if pod.ObjectMeta.Namespace == "" {
pod.ObjectMeta.Namespace = req.Namespace
}
klog.Infof("Process proxyless injection request")
proxyConfig := wh.env.GetProxyConfigOrDefault(pod.Namespace, pod.Labels, pod.Annotations, wh.meshConfig)
deploy, typeMeta := kube.GetDeployMetaFromPod(&pod)
params := InjectionParameters{
pod: &pod,
deployMeta: deploy,
typeMeta: typeMeta,
templates: wh.Config.Templates,
defaultTemplate: wh.Config.DefaultTemplates,
aliases: wh.Config.Aliases,
meshConfig: wh.meshConfig,
proxyConfig: proxyConfig,
valuesConfig: wh.valuesConfig,
injectedAnnotations: wh.Config.InjectedAnnotations,
proxyEnvs: parseInjectEnvs(path),
revision: wh.revision,
}
patchBytes, err := injectPod(params)
if err != nil {
return toAdmissionResponse(err)
}
reviewResponse := kube.AdmissionResponse{
Allowed: true,
Patch: patchBytes,
PatchType: func() *string {
pt := "JSONPatch"
return &pt
}(),
}
return &reviewResponse
}
func (wh *Webhook) Run(stop <-chan struct{}) {
go wh.watcher.Run(stop)
}
func ParseTemplates(tmpls RawTemplates) (Templates, error) {
ret := make(Templates, len(tmpls))
for k, t := range tmpls {
p, err := parseDryTemplate(t, InjectionFuncmap)
if err != nil {
return nil, err
}
ret[k] = p
}
return ret, nil
}
func toAdmissionResponse(err error) *kube.AdmissionResponse {
return &kube.AdmissionResponse{Result: &metav1.Status{Message: err.Error()}}
}