blob: c1df58a42533d6c28cafaa7aeffd66971afcfdaf [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package inject
import (
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"strconv"
"strings"
"sync"
"text/template"
"time"
)
import (
"github.com/prometheus/prometheus/util/strutil"
"gomodules.xyz/jsonpatch/v3"
"istio.io/api/annotation"
"istio.io/api/label"
meshconfig "istio.io/api/mesh/v1alpha1"
"istio.io/pkg/log"
kubeApiAdmissionv1 "k8s.io/api/admission/v1"
kubeApiAdmissionv1beta1 "k8s.io/api/admission/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
kjson "k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/util/mergepatch"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"sigs.k8s.io/yaml"
)
import (
opconfig "github.com/apache/dubbo-go-pixiu/operator/pkg/apis/istio/v1alpha1"
"github.com/apache/dubbo-go-pixiu/pilot/cmd/pilot-agent/status"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pkg/config/mesh"
"github.com/apache/dubbo-go-pixiu/pkg/kube"
"github.com/apache/dubbo-go-pixiu/pkg/util/protomarshal"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
)
var (
runtimeScheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(runtimeScheme)
deserializer = codecs.UniversalDeserializer()
jsonSerializer = kjson.NewSerializerWithOptions(kjson.DefaultMetaFactory, runtimeScheme, runtimeScheme, kjson.SerializerOptions{})
URLParameterToEnv = map[string]string{
"cluster": "ISTIO_META_CLUSTER_ID",
"net": "ISTIO_META_NETWORK",
}
)
func init() {
_ = corev1.AddToScheme(runtimeScheme)
_ = kubeApiAdmissionv1.AddToScheme(runtimeScheme)
_ = kubeApiAdmissionv1beta1.AddToScheme(runtimeScheme)
}
const (
// prometheus will convert annotation to this format
// `prometheus.io/scrape` `prometheus.io.scrape` `prometheus-io/scrape` have the same meaning in Prometheus
// for more details, please checkout [here](https://github.com/prometheus/prometheus/blob/71a0f42331566a8849863d77078083edbb0b3bc4/util/strutil/strconv.go#L40)
prometheusScrapeAnnotation = "prometheus_io_scrape"
prometheusPortAnnotation = "prometheus_io_port"
prometheusPathAnnotation = "prometheus_io_path"
watchDebounceDelay = 100 * time.Millisecond
)
// Webhook implements a mutating webhook for automatic proxy injection.
type Webhook struct {
mu sync.RWMutex
Config *Config
meshConfig *meshconfig.MeshConfig
valuesConfig ValuesConfig
watcher Watcher
env *model.Environment
revision string
}
// nolint directives: interfacer
func loadConfig(injectFile, valuesFile string) (*Config, string, error) {
data, err := os.ReadFile(injectFile)
if err != nil {
return nil, "", err
}
var c *Config
if c, err = unmarshalConfig(data); err != nil {
log.Warnf("Failed to parse injectFile %s", string(data))
return nil, "", err
}
valuesConfig, err := os.ReadFile(valuesFile)
if err != nil {
return nil, "", err
}
return c, string(valuesConfig), nil
}
func unmarshalConfig(data []byte) (*Config, error) {
c, err := UnmarshalConfig(data)
if err != nil {
return nil, err
}
log.Debugf("New inject configuration: sha256sum %x", sha256.Sum256(data))
log.Debugf("Policy: %v", c.Policy)
log.Debugf("AlwaysInjectSelector: %v", c.AlwaysInjectSelector)
log.Debugf("NeverInjectSelector: %v", c.NeverInjectSelector)
log.Debugf("Templates: |\n %v", c.RawTemplates, "\n", "\n ", -1)
return &c, nil
}
// WebhookParameters configures parameters for the sidecar injection
// webhook.
type WebhookParameters struct {
// Watcher watches the sidecar injection configuration.
Watcher Watcher
// Port is the webhook port, e.g. typically 443 for https.
// This is mainly used for tests. Webhook runs on the port started by Istiod.
Port int
Env *model.Environment
// Use an existing mux instead of creating our own.
Mux *http.ServeMux
// The istio.io/rev this injector is responsible for
Revision string
}
// NewWebhook creates a new instance of a mutating webhook for automatic sidecar injection.
func NewWebhook(p WebhookParameters) (*Webhook, error) {
if p.Mux == nil {
return nil, errors.New("expected mux to be passed, but was not passed")
}
wh := &Webhook{
watcher: p.Watcher,
meshConfig: p.Env.Mesh(),
env: p.Env,
revision: p.Revision,
}
p.Watcher.SetHandler(wh.updateConfig)
sidecarConfig, valuesConfig, err := p.Watcher.Get()
if err != nil {
return nil, err
}
if err := wh.updateConfig(sidecarConfig, valuesConfig); err != nil {
log.Errorf("failed to process webhook config: %v", err)
}
p.Mux.HandleFunc("/inject", wh.serveInject)
p.Mux.HandleFunc("/inject/", wh.serveInject)
p.Env.Watcher.AddMeshHandler(func() {
wh.mu.Lock()
wh.meshConfig = p.Env.Mesh()
wh.mu.Unlock()
})
return wh, nil
}
// Run implements the webhook server
func (wh *Webhook) Run(stop <-chan struct{}) {
go wh.watcher.Run(stop)
}
func (wh *Webhook) updateConfig(sidecarConfig *Config, valuesConfig string) error {
wh.mu.Lock()
defer wh.mu.Unlock()
wh.Config = sidecarConfig
vc, err := NewValuesConfig(valuesConfig)
if err != nil {
return err
}
wh.valuesConfig = vc
return nil
}
type ContainerReorder int
const (
MoveFirst ContainerReorder = iota
MoveLast
Remove
)
func modifyContainers(cl []corev1.Container, name string, modifier ContainerReorder) []corev1.Container {
containers := []corev1.Container{}
var match *corev1.Container
for _, c := range cl {
c := c
if c.Name != name {
containers = append(containers, c)
} else {
match = &c
}
}
if match == nil {
return containers
}
switch modifier {
case MoveFirst:
return append([]corev1.Container{*match}, containers...)
case MoveLast:
return append(containers, *match)
case Remove:
return containers
default:
return cl
}
}
func enablePrometheusMerge(mesh *meshconfig.MeshConfig, anno map[string]string) bool {
// If annotation is present, we look there first
if val, f := anno[annotation.PrometheusMergeMetrics.Name]; f {
bval, err := strconv.ParseBool(val)
if err != nil {
// This shouldn't happen since we validate earlier in the code
log.Warnf("invalid annotation %v=%v", annotation.PrometheusMergeMetrics.Name, bval)
} else {
return bval
}
}
// If mesh config setting is present, use that
if mesh.GetEnablePrometheusMerge() != nil {
return mesh.GetEnablePrometheusMerge().Value
}
// Otherwise, we default to enable
return true
}
func toAdmissionResponse(err error) *kube.AdmissionResponse {
return &kube.AdmissionResponse{Result: &metav1.Status{Message: err.Error()}}
}
func ParseTemplates(tmpls RawTemplates) (Templates, error) {
ret := make(Templates, len(tmpls))
for k, t := range tmpls {
p, err := parseDryTemplate(t, InjectionFuncmap)
if err != nil {
return nil, err
}
ret[k] = p
}
return ret, nil
}
type ValuesConfig struct {
raw string
asStruct *opconfig.Values
asMap map[string]interface{}
}
func NewValuesConfig(v string) (ValuesConfig, error) {
c := ValuesConfig{raw: v}
valuesStruct := &opconfig.Values{}
if err := protomarshal.ApplyYAML(v, valuesStruct); err != nil {
return c, fmt.Errorf("could not parse configuration values: %v", err)
}
c.asStruct = valuesStruct
values := map[string]interface{}{}
if err := yaml.Unmarshal([]byte(v), &values); err != nil {
return c, fmt.Errorf("could not parse configuration values: %v", err)
}
c.asMap = values
return c, nil
}
type InjectionParameters struct {
pod *corev1.Pod
deployMeta metav1.ObjectMeta
typeMeta metav1.TypeMeta
templates map[string]*template.Template
defaultTemplate []string
aliases map[string][]string
meshConfig *meshconfig.MeshConfig
proxyConfig *meshconfig.ProxyConfig
valuesConfig ValuesConfig
revision string
proxyEnvs map[string]string
injectedAnnotations map[string]string
}
func checkPreconditions(params InjectionParameters) {
spec := params.pod.Spec
metadata := params.pod.ObjectMeta
// If DNSPolicy is not ClusterFirst, the Envoy sidecar may not able to connect to Istio Pilot.
if spec.DNSPolicy != "" && spec.DNSPolicy != corev1.DNSClusterFirst {
podName := potentialPodName(metadata)
log.Warnf("%q's DNSPolicy is not %q. The Envoy sidecar may not able to connect to Istio Pilot",
metadata.Namespace+"/"+podName, corev1.DNSClusterFirst)
}
}
func getInjectionStatus(podSpec corev1.PodSpec, revision string) string {
stat := &SidecarInjectionStatus{}
for _, c := range podSpec.InitContainers {
stat.InitContainers = append(stat.InitContainers, c.Name)
}
for _, c := range podSpec.Containers {
stat.Containers = append(stat.Containers, c.Name)
}
for _, c := range podSpec.Volumes {
stat.Volumes = append(stat.Volumes, c.Name)
}
for _, c := range podSpec.ImagePullSecrets {
stat.ImagePullSecrets = append(stat.ImagePullSecrets, c.Name)
}
// Rather than setting istio.io/rev label on injected pods include them here in status annotation.
// This keeps us from overwriting the istio.io/rev label when using revision tags (i.e. istio.io/rev=<tag>).
if revision == "" {
revision = "default"
}
stat.Revision = revision
statusAnnotationValue, err := json.Marshal(stat)
if err != nil {
return "{}"
}
return string(statusAnnotationValue)
}
// injectPod is the core of the injection logic. This takes a pod and injection
// template, as well as some inputs to the injection template, and produces a
// JSON patch.
//
// In the webhook, we will receive a Pod directly from Kubernetes, and return the
// patch directly; Kubernetes will take care of applying the patch.
//
// For kube-inject, we will parse out a Pod from YAML (which may involve
// extraction from higher level types like Deployment), then apply the patch
// locally.
//
// The injection logic works by first applying the rendered injection template on
// top of the input pod This is done using a Strategic Patch Merge
// (https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/strategic-merge-patch.md)
// Currently only a single template is supported, although in the future the template to use will be configurable
// and multiple templates will be supported by applying them in successive order.
//
// In addition to the plain templating, there is some post processing done to
// handle cases that cannot feasibly be covered in the template, such as
// re-ordering pods, rewriting readiness probes, etc.
func injectPod(req InjectionParameters) ([]byte, error) {
checkPreconditions(req)
// The patch will be built relative to the initial pod, capture its current state
originalPodSpec, err := json.Marshal(req.pod)
if err != nil {
return nil, err
}
// Run the injection template, giving us a partial pod spec
mergedPod, injectedPodData, err := RunTemplate(req)
if err != nil {
return nil, fmt.Errorf("failed to run injection template: %v", err)
}
mergedPod, err = reapplyOverwrittenContainers(mergedPod, req.pod, injectedPodData)
if err != nil {
return nil, fmt.Errorf("failed to re apply container: %v", err)
}
// Apply some additional transformations to the pod
if err := postProcessPod(mergedPod, *injectedPodData, req); err != nil {
return nil, fmt.Errorf("failed to process pod: %v", err)
}
patch, err := createPatch(mergedPod, originalPodSpec)
if err != nil {
return nil, fmt.Errorf("failed to create patch: %v", err)
}
log.Debugf("AdmissionResponse: patch=%v\n", string(patch))
return patch, nil
}
// reapplyOverwrittenContainers enables users to provide container level overrides for settings in the injection template
// * originalPod: the pod before injection. If needed, we will apply some configurations from this pod on top of the final pod
// * templatePod: the rendered injection template. This is needed only to see what containers we injected
// * finalPod: the current result of injection, roughly equivalent to the merging of originalPod and templatePod
// There are essentially three cases we cover here:
// 1. There is no overlap in containers in original and template pod. We will do nothing.
// 2. There is an overlap (ie, both define istio-proxy), but that is because the pod is being re-injected.
// In this case we do nothing, since we want to apply the new settings
// 3. There is an overlap. We will re-apply the original container.
//
// Where "overlap" is a container defined in both the original and template pod. Typically, this would mean
// the user has defined an `istio-proxy` container in their own pod spec.
func reapplyOverwrittenContainers(finalPod *corev1.Pod, originalPod *corev1.Pod, templatePod *corev1.Pod) (*corev1.Pod, error) {
type podOverrides struct {
Containers []corev1.Container `json:"containers,omitempty"`
InitContainers []corev1.Container `json:"initContainers,omitempty"`
}
overrides := podOverrides{}
existingOverrides := podOverrides{}
if annotationOverrides, f := originalPod.Annotations[annotation.ProxyOverrides.Name]; f {
if err := json.Unmarshal([]byte(annotationOverrides), &existingOverrides); err != nil {
return nil, err
}
}
for _, c := range templatePod.Spec.Containers {
match := FindContainer(c.Name, existingOverrides.Containers)
if match == nil {
match = FindContainer(c.Name, originalPod.Spec.Containers)
}
if match == nil {
continue
}
overlay := *match.DeepCopy()
if overlay.Image == AutoImage {
overlay.Image = ""
}
overrides.Containers = append(overrides.Containers, overlay)
newMergedPod, err := applyContainer(finalPod, overlay)
if err != nil {
return nil, fmt.Errorf("failed to apply sidecar container: %v", err)
}
finalPod = newMergedPod
}
for _, c := range templatePod.Spec.InitContainers {
match := FindContainer(c.Name, existingOverrides.InitContainers)
if match == nil {
match = FindContainer(c.Name, originalPod.Spec.InitContainers)
}
if match == nil {
continue
}
overlay := *match.DeepCopy()
if overlay.Image == AutoImage {
overlay.Image = ""
}
overrides.InitContainers = append(overrides.InitContainers, overlay)
newMergedPod, err := applyInitContainer(finalPod, overlay)
if err != nil {
return nil, fmt.Errorf("failed to apply sidecar init container: %v", err)
}
finalPod = newMergedPod
}
_, alreadyInjected := originalPod.Annotations[annotation.SidecarStatus.Name]
if !alreadyInjected && (len(overrides.Containers) > 0 || len(overrides.InitContainers) > 0) {
// We found any overrides. Put them in the pod annotation so we can re-apply them on re-injection
js, err := json.Marshal(overrides)
if err != nil {
return nil, err
}
if finalPod.Annotations == nil {
finalPod.Annotations = map[string]string{}
}
finalPod.Annotations[annotation.ProxyOverrides.Name] = string(js)
}
return finalPod, nil
}
// reinsertOverrides applies the containers listed in OverrideAnnotation to a pod. This is to achieve
// idempotency by handling an edge case where an injection template is modifying a container already
// present in the pod spec. In these cases, the logic to strip injected containers would remove the
// original injected parts as well, leading to the templating logic being different (for example,
// reading the .Spec.Containers field would be empty).
func reinsertOverrides(pod *corev1.Pod) (*corev1.Pod, error) {
type podOverrides struct {
Containers []corev1.Container `json:"containers,omitempty"`
InitContainers []corev1.Container `json:"initContainers,omitempty"`
}
existingOverrides := podOverrides{}
if annotationOverrides, f := pod.Annotations[annotation.ProxyOverrides.Name]; f {
if err := json.Unmarshal([]byte(annotationOverrides), &existingOverrides); err != nil {
return nil, err
}
}
pod = pod.DeepCopy()
for _, c := range existingOverrides.Containers {
match := FindContainer(c.Name, pod.Spec.Containers)
if match != nil {
continue
}
pod.Spec.Containers = append(pod.Spec.Containers, c)
}
for _, c := range existingOverrides.InitContainers {
match := FindContainer(c.Name, pod.Spec.InitContainers)
if match != nil {
continue
}
pod.Spec.InitContainers = append(pod.Spec.InitContainers, c)
}
return pod, nil
}
func createPatch(pod *corev1.Pod, original []byte) ([]byte, error) {
reinjected, err := json.Marshal(pod)
if err != nil {
return nil, err
}
p, err := jsonpatch.CreatePatch(original, reinjected)
if err != nil {
return nil, err
}
return json.Marshal(p)
}
// postProcessPod applies additionally transformations to the pod after merging with the injected template
// This is generally things that cannot reasonably be added to the template
func postProcessPod(pod *corev1.Pod, injectedPod corev1.Pod, req InjectionParameters) error {
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
if pod.Labels == nil {
pod.Labels = map[string]string{}
}
overwriteClusterInfo(pod.Spec.Containers, req)
if err := applyPrometheusMerge(pod, req.meshConfig); err != nil {
return err
}
if err := applyRewrite(pod, req); err != nil {
return err
}
applyMetadata(pod, injectedPod, req)
if err := reorderPod(pod, req); err != nil {
return err
}
return nil
}
func applyMetadata(pod *corev1.Pod, injectedPodData corev1.Pod, req InjectionParameters) {
if nw, ok := req.proxyEnvs["ISTIO_META_NETWORK"]; ok {
pod.Labels[label.TopologyNetwork.Name] = nw
}
// Add all additional injected annotations. These are overridden if needed
pod.Annotations[annotation.SidecarStatus.Name] = getInjectionStatus(injectedPodData.Spec, req.revision)
// Deprecated; should be set directly in the template instead
for k, v := range req.injectedAnnotations {
pod.Annotations[k] = v
}
}
// reorderPod ensures containers are properly ordered after merging
func reorderPod(pod *corev1.Pod, req InjectionParameters) error {
var merr error
mc := req.meshConfig
// Get copy of pod proxyconfig, to determine container ordering
if pca, f := req.pod.ObjectMeta.GetAnnotations()[annotation.ProxyConfig.Name]; f {
mc, merr = mesh.ApplyProxyConfig(pca, req.meshConfig)
if merr != nil {
return merr
}
}
// nolint: staticcheck
holdPod := mc.GetDefaultConfig().GetHoldApplicationUntilProxyStarts().GetValue() ||
req.valuesConfig.asStruct.GetGlobal().GetProxy().GetHoldApplicationUntilProxyStarts().GetValue()
proxyLocation := MoveLast
// If HoldApplicationUntilProxyStarts is set, reorder the proxy location
if holdPod {
proxyLocation = MoveFirst
}
// Proxy container should be last, unless HoldApplicationUntilProxyStarts is set
// This is to ensure `kubectl exec` and similar commands continue to default to the user's container
pod.Spec.Containers = modifyContainers(pod.Spec.Containers, ProxyContainerName, proxyLocation)
// Validation container must be first to block any user containers
pod.Spec.InitContainers = modifyContainers(pod.Spec.InitContainers, ValidationContainerName, MoveFirst)
// Init container must be last to allow any traffic to pass before iptables is setup
pod.Spec.InitContainers = modifyContainers(pod.Spec.InitContainers, InitContainerName, MoveLast)
pod.Spec.InitContainers = modifyContainers(pod.Spec.InitContainers, EnableCoreDumpName, MoveLast)
return nil
}
func applyRewrite(pod *corev1.Pod, req InjectionParameters) error {
sidecar := FindSidecar(pod.Spec.Containers)
if sidecar == nil {
return nil
}
rewrite := ShouldRewriteAppHTTPProbers(pod.Annotations, req.valuesConfig.asStruct.GetSidecarInjectorWebhook().GetRewriteAppHTTPProbe().GetValue())
// We don't have to escape json encoding here when using golang libraries.
if rewrite {
if prober := DumpAppProbers(&pod.Spec, req.meshConfig.GetDefaultConfig().GetStatusPort()); prober != "" {
sidecar.Env = append(sidecar.Env, corev1.EnvVar{Name: status.KubeAppProberEnvName, Value: prober})
}
patchRewriteProbe(pod.Annotations, pod, req.meshConfig.GetDefaultConfig().GetStatusPort())
}
return nil
}
var emptyScrape = status.PrometheusScrapeConfiguration{}
// applyPrometheusMerge configures prometheus scraping annotations for the "metrics merge" feature.
// This moves the current prometheus.io annotations into an environment variable and replaces them
// pointing to the agent.
func applyPrometheusMerge(pod *corev1.Pod, mesh *meshconfig.MeshConfig) error {
if getPrometheusScrape(pod) &&
enablePrometheusMerge(mesh, pod.ObjectMeta.Annotations) {
targetPort := strconv.Itoa(int(mesh.GetDefaultConfig().GetStatusPort()))
if cur, f := getPrometheusPort(pod); f {
// We have already set the port, assume user is controlling this or, more likely, re-injected
// the pod.
if cur == targetPort {
return nil
}
}
scrape := getPrometheusScrapeConfiguration(pod)
sidecar := FindSidecar(pod.Spec.Containers)
if sidecar != nil && scrape != emptyScrape {
by, err := json.Marshal(scrape)
if err != nil {
return err
}
sidecar.Env = append(sidecar.Env, corev1.EnvVar{Name: status.PrometheusScrapingConfig.Name, Value: string(by)})
}
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
// if a user sets `prometheus/io/path: foo`, then we add `prometheus.io/path: /stats/prometheus`
// prometheus will pick a random one
// need to clear out all variants and then set ours
clearPrometheusAnnotations(pod)
pod.Annotations["prometheus.io/port"] = targetPort
pod.Annotations["prometheus.io/path"] = "/stats/prometheus"
pod.Annotations["prometheus.io/scrape"] = "true"
return nil
}
return nil
}
// getPrometheusScrape respect prometheus scrape config
// not to doing prometheusMerge if this return false
func getPrometheusScrape(pod *corev1.Pod) bool {
for k, val := range pod.Annotations {
if strutil.SanitizeLabelName(k) != prometheusScrapeAnnotation {
continue
}
if scrape, err := strconv.ParseBool(val); err == nil {
return scrape
}
}
return true
}
var prometheusAnnotations = sets.New(
prometheusPathAnnotation,
prometheusPortAnnotation,
prometheusScrapeAnnotation,
)
func clearPrometheusAnnotations(pod *corev1.Pod) {
needRemovedKeys := make([]string, 0, 2)
for k := range pod.Annotations {
anno := strutil.SanitizeLabelName(k)
if prometheusAnnotations.Contains(anno) {
needRemovedKeys = append(needRemovedKeys, k)
}
}
for _, k := range needRemovedKeys {
delete(pod.Annotations, k)
}
}
func getPrometheusScrapeConfiguration(pod *corev1.Pod) status.PrometheusScrapeConfiguration {
cfg := status.PrometheusScrapeConfiguration{}
for k, val := range pod.Annotations {
anno := strutil.SanitizeLabelName(k)
switch anno {
case prometheusPortAnnotation:
cfg.Port = val
case prometheusScrapeAnnotation:
cfg.Scrape = val
case prometheusPathAnnotation:
cfg.Path = val
}
}
return cfg
}
func getPrometheusPort(pod *corev1.Pod) (string, bool) {
for k, val := range pod.Annotations {
if strutil.SanitizeLabelName(k) != prometheusPortAnnotation {
continue
}
return val, true
}
return "", false
}
const (
// AutoImage is the special image name to indicate to the injector that we should use the injected image, and NOT override it
// This is necessary because image is a required field on container, so if a user defines an istio-proxy container
// with customizations they must set an image.
AutoImage = "auto"
)
// applyContainer merges a container spec on top of the provided pod
func applyContainer(target *corev1.Pod, container corev1.Container) (*corev1.Pod, error) {
overlay := &corev1.Pod{Spec: corev1.PodSpec{Containers: []corev1.Container{container}}}
overlayJSON, err := json.Marshal(overlay)
if err != nil {
return nil, err
}
return applyOverlay(target, overlayJSON)
}
// applyInitContainer merges a container spec on top of the provided pod as an init container
func applyInitContainer(target *corev1.Pod, container corev1.Container) (*corev1.Pod, error) {
overlay := &corev1.Pod{Spec: corev1.PodSpec{
// We need to set containers to empty, otherwise it will marshal as "null" and delete all containers
Containers: []corev1.Container{},
InitContainers: []corev1.Container{container},
}}
overlayJSON, err := json.Marshal(overlay)
if err != nil {
return nil, err
}
return applyOverlay(target, overlayJSON)
}
func patchHandleUnmarshal(j []byte, unmarshal func(data []byte, v interface{}) error) (map[string]interface{}, error) {
if j == nil {
j = []byte("{}")
}
m := map[string]interface{}{}
err := unmarshal(j, &m)
if err != nil {
return nil, mergepatch.ErrBadJSONDoc
}
return m, nil
}
// StrategicMergePatchYAML is a small fork of strategicpatch.StrategicMergePatch to allow YAML patches
// This avoids expensive conversion from YAML to JSON
func StrategicMergePatchYAML(originalJSON []byte, patchYAML []byte, dataStruct interface{}) ([]byte, error) {
schema, err := strategicpatch.NewPatchMetaFromStruct(dataStruct)
if err != nil {
return nil, err
}
originalMap, err := patchHandleUnmarshal(originalJSON, json.Unmarshal)
if err != nil {
return nil, err
}
patchMap, err := patchHandleUnmarshal(patchYAML, func(data []byte, v interface{}) error {
return yaml.Unmarshal(data, v)
})
if err != nil {
return nil, err
}
result, err := strategicpatch.StrategicMergeMapPatchUsingLookupPatchMeta(originalMap, patchMap, schema)
if err != nil {
return nil, err
}
return json.Marshal(result)
}
// applyContainer merges a pod spec, provided as JSON, on top of the provided pod
func applyOverlayYAML(target *corev1.Pod, overlayYAML []byte) (*corev1.Pod, error) {
currentJSON, err := json.Marshal(target)
if err != nil {
return nil, err
}
pod := corev1.Pod{}
// Overlay the injected template onto the original podSpec
patched, err := StrategicMergePatchYAML(currentJSON, overlayYAML, pod)
if err != nil {
return nil, fmt.Errorf("strategic merge: %v", err)
}
if err := json.Unmarshal(patched, &pod); err != nil {
return nil, fmt.Errorf("unmarshal patched pod: %v", err)
}
return &pod, nil
}
// applyContainer merges a pod spec, provided as JSON, on top of the provided pod
func applyOverlay(target *corev1.Pod, overlayJSON []byte) (*corev1.Pod, error) {
currentJSON, err := json.Marshal(target)
if err != nil {
return nil, err
}
pod := corev1.Pod{}
// Overlay the injected template onto the original podSpec
patched, err := strategicpatch.StrategicMergePatch(currentJSON, overlayJSON, pod)
if err != nil {
return nil, fmt.Errorf("strategic merge: %v", err)
}
if err := json.Unmarshal(patched, &pod); err != nil {
return nil, fmt.Errorf("unmarshal patched pod: %v", err)
}
return &pod, nil
}
func (wh *Webhook) inject(ar *kube.AdmissionReview, path string) *kube.AdmissionResponse {
req := ar.Request
var pod corev1.Pod
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
handleError(fmt.Sprintf("Could not unmarshal raw object: %v %s", err,
string(req.Object.Raw)))
return toAdmissionResponse(err)
}
// Managed fields is sometimes extremely large, leading to excessive CPU time on patch generation
// It does not impact the injection output at all, so we can just remove it.
pod.ManagedFields = nil
// Deal with potential empty fields, e.g., when the pod is created by a deployment
podName := potentialPodName(pod.ObjectMeta)
if pod.ObjectMeta.Namespace == "" {
pod.ObjectMeta.Namespace = req.Namespace
}
log.Infof("Sidecar injection request for %v/%v", req.Namespace, podName)
log.Debugf("Object: %v", string(req.Object.Raw))
log.Debugf("OldObject: %v", string(req.OldObject.Raw))
wh.mu.RLock()
if !injectRequired(IgnoredNamespaces.UnsortedList(), wh.Config, &pod.Spec, pod.ObjectMeta) {
log.Infof("Skipping %s/%s due to policy check", pod.ObjectMeta.Namespace, podName)
totalSkippedInjections.Increment()
wh.mu.RUnlock()
return &kube.AdmissionResponse{
Allowed: true,
}
}
proxyConfig := mesh.DefaultProxyConfig()
if wh.env.PushContext != nil && wh.env.PushContext.ProxyConfigs != nil {
if generatedProxyConfig := wh.env.PushContext.ProxyConfigs.EffectiveProxyConfig(
&model.NodeMetadata{
Namespace: pod.Namespace,
Labels: pod.Labels,
Annotations: pod.Annotations,
}, wh.meshConfig); generatedProxyConfig != nil {
proxyConfig = generatedProxyConfig
}
}
deploy, typeMeta := kube.GetDeployMetaFromPod(&pod)
params := InjectionParameters{
pod: &pod,
deployMeta: deploy,
typeMeta: typeMeta,
templates: wh.Config.Templates,
defaultTemplate: wh.Config.DefaultTemplates,
aliases: wh.Config.Aliases,
meshConfig: wh.meshConfig,
proxyConfig: proxyConfig,
valuesConfig: wh.valuesConfig,
revision: wh.revision,
injectedAnnotations: wh.Config.InjectedAnnotations,
proxyEnvs: parseInjectEnvs(path),
}
wh.mu.RUnlock()
patchBytes, err := injectPod(params)
if err != nil {
handleError(fmt.Sprintf("Pod injection failed: %v", err))
return toAdmissionResponse(err)
}
reviewResponse := kube.AdmissionResponse{
Allowed: true,
Patch: patchBytes,
PatchType: func() *string {
pt := "JSONPatch"
return &pt
}(),
}
totalSuccessfulInjections.Increment()
return &reviewResponse
}
func (wh *Webhook) serveInject(w http.ResponseWriter, r *http.Request) {
totalInjections.Increment()
var body []byte
if r.Body != nil {
if data, err := kube.HTTPConfigReader(r); err == nil {
body = data
} else {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
if len(body) == 0 {
handleError("no body found")
http.Error(w, "no body found", http.StatusBadRequest)
return
}
// verify the content type is accurate
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
handleError(fmt.Sprintf("contentType=%s, expect application/json", contentType))
http.Error(w, "invalid Content-Type, want `application/json`", http.StatusUnsupportedMediaType)
return
}
path := ""
if r.URL != nil {
path = r.URL.Path
}
var reviewResponse *kube.AdmissionResponse
var obj runtime.Object
var ar *kube.AdmissionReview
if out, _, err := deserializer.Decode(body, nil, obj); err != nil {
handleError(fmt.Sprintf("Could not decode body: %v", err))
reviewResponse = toAdmissionResponse(err)
} else {
log.Debugf("AdmissionRequest for path=%s\n", path)
ar, err = kube.AdmissionReviewKubeToAdapter(out)
if err != nil {
handleError(fmt.Sprintf("Could not decode object: %v", err))
}
reviewResponse = wh.inject(ar, path)
}
response := kube.AdmissionReview{}
response.Response = reviewResponse
var responseKube runtime.Object
var apiVersion string
if ar != nil {
apiVersion = ar.APIVersion
response.TypeMeta = ar.TypeMeta
if response.Response != nil {
if ar.Request != nil {
response.Response.UID = ar.Request.UID
}
}
}
responseKube = kube.AdmissionReviewAdapterToKube(&response, apiVersion)
resp, err := json.Marshal(responseKube)
if err != nil {
log.Errorf("Could not encode response: %v", err)
http.Error(w, fmt.Sprintf("could not encode response: %v", err), http.StatusInternalServerError)
}
if _, err := w.Write(resp); err != nil {
log.Errorf("Could not write response: %v", err)
http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
}
}
// parseInjectEnvs parse new envs from inject url path
// follow format: /inject/k1/v1/k2/v2 when values do not contain slashes,
// follow format: /inject/:ENV:net=network1:ENV:cluster=cluster1:ENV:rootpage=/foo/bar
// when values contain slashes.
func parseInjectEnvs(path string) map[string]string {
path = strings.TrimSuffix(path, "/")
res := func(path string) []string {
parts := strings.SplitN(path, "/", 3)
// The 3rd part has to start with separator :ENV:
// If not, this inject path is considered using slash as separator
// If length is less than 3, then the path is simply "/inject",
// process just like before :ENV: separator is introduced.
var newRes []string
if len(parts) == 3 {
if strings.HasPrefix(parts[2], ":ENV:") {
pairs := strings.Split(parts[2], ":ENV:")
for i := 1; i < len(pairs); i++ { // skip the first part, it is a nil
pair := strings.SplitN(pairs[i], "=", 2)
// The first part is the variable name which can not be empty
// the second part is the variable value which can be empty but has to exist
// for example, aaa=bbb, aaa= are valid, but =aaa or = are not valid, the
// invalid ones will be ignored.
if len(pair[0]) > 0 && len(pair) == 2 {
newRes = append(newRes, pair...)
}
}
return newRes
}
return strings.Split(parts[2], "/")
}
return newRes
}(path)
newEnvs := make(map[string]string)
for i := 0; i < len(res); i += 2 {
k := res[i]
if i == len(res)-1 { // ignore the last key without value
log.Warnf("Odd number of inject env entries, ignore the last key %s\n", k)
break
}
env, found := URLParameterToEnv[k]
if !found {
env = strings.ToUpper(k) // if not found, use the custom env directly
}
if env != "" {
newEnvs[env] = res[i+1]
}
}
return newEnvs
}
func handleError(message string) {
log.Errorf(message)
totalFailedInjections.Increment()
}