blob: a3655f3b78d59853579b46c239b8671a7d63651a [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package inject implements kube-inject or webhoook autoinject feature to inject sidecar.
// This file is focused on rewriting Kubernetes app probers to support mutual TLS.
package inject
import (
"encoding/json"
"strconv"
)
import (
"istio.io/api/annotation"
"istio.io/pkg/log"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/cmd/pilot-agent/status"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
)
// ShouldRewriteAppHTTPProbers returns if we should rewrite apps' probers config.
func ShouldRewriteAppHTTPProbers(annotations map[string]string, specSetting bool) bool {
if annotations != nil {
if value, ok := annotations[annotation.SidecarRewriteAppHTTPProbers.Name]; ok {
if isSetInAnnotation, err := strconv.ParseBool(value); err == nil {
return isSetInAnnotation
}
}
}
return specSetting
}
// FindSidecar returns the pointer to the first container whose name matches the "istio-proxy".
func FindSidecar(containers []corev1.Container) *corev1.Container {
return FindContainer(ProxyContainerName, containers)
}
// FindContainer returns the pointer to the first container whose name matches.
func FindContainer(name string, containers []corev1.Container) *corev1.Container {
for i := range containers {
if containers[i].Name == name {
return &containers[i]
}
}
return nil
}
// convertAppProber returns an overwritten `Probe` for pilot agent to take over.
func convertAppProber(probe *corev1.Probe, newURL string, statusPort int) *corev1.Probe {
if probe == nil {
return nil
}
if probe.HTTPGet != nil {
return convertAppProberHTTPGet(probe, newURL, statusPort)
} else if probe.TCPSocket != nil && features.RewriteTCPProbes {
return convertAppProberTCPSocket(probe, newURL, statusPort)
} else if probe.GRPC != nil {
return convertAppProberGRPC(probe, newURL, statusPort)
}
return nil
}
// convertAppProberHTTPGet returns an overwritten `Probe` (HttpGet) for pilot agent to take over.
func convertAppProberHTTPGet(probe *corev1.Probe, newURL string, statusPort int) *corev1.Probe {
p := probe.DeepCopy()
// Change the application container prober config.
p.HTTPGet.Port = intstr.FromInt(statusPort)
p.HTTPGet.Path = newURL
// For HTTPS prober, we change to HTTP,
// and pilot agent uses https to request application prober endpoint.
// Kubelet -> HTTP -> Pilot Agent -> HTTPS -> Application
if p.HTTPGet.Scheme == corev1.URISchemeHTTPS {
p.HTTPGet.Scheme = corev1.URISchemeHTTP
}
return p
}
// convertAppProberTCPSocket returns an overwritten `Probe` (TcpSocket) for pilot agent to take over.
func convertAppProberTCPSocket(probe *corev1.Probe, newURL string, statusPort int) *corev1.Probe {
p := probe.DeepCopy()
// the sidecar intercepts all tcp connections, so we change it to a HTTP probe and the sidecar will check tcp
p.HTTPGet = &corev1.HTTPGetAction{}
p.HTTPGet.Port = intstr.FromInt(statusPort)
p.HTTPGet.Path = newURL
p.TCPSocket = nil
return p
}
// convertAppProberGRPC returns an overwritten `Probe` (gRPC) for pilot agent to take over.
func convertAppProberGRPC(probe *corev1.Probe, newURL string, statusPort int) *corev1.Probe {
p := probe.DeepCopy()
// the sidecar intercepts all gRPC connections, so we change it to a HTTP probe and the sidecar will check gRPC
p.HTTPGet = &corev1.HTTPGetAction{}
p.HTTPGet.Port = intstr.FromInt(statusPort)
p.HTTPGet.Path = newURL
// For gRPC prober, we change to HTTP,
// and pilot agent uses gRPC to request application prober endpoint.
// Kubelet -> HTTP -> Pilot Agent -> gRPC -> Application
p.GRPC = nil
return p
}
type KubeAppProbers map[string]*Prober
// Prober represents a single container prober
type Prober struct {
HTTPGet *corev1.HTTPGetAction `json:"httpGet,omitempty"`
TCPSocket *corev1.TCPSocketAction `json:"tcpSocket,omitempty"`
GRPC *corev1.GRPCAction `json:"grpc,omitempty"`
TimeoutSeconds int32 `json:"timeoutSeconds,omitempty"`
}
// DumpAppProbers returns a json encoded string as `status.KubeAppProbers`.
// Also update the probers so that all usages of named port will be resolved to integer.
func DumpAppProbers(podSpec *corev1.PodSpec, targetPort int32) string {
out := KubeAppProbers{}
updateNamedPort := func(p *Prober, portMap map[string]int32) *Prober {
if p == nil {
return nil
}
if p.GRPC != nil {
// don't need to update for gRPC probe port as it only supports integer
return p
}
if p.HTTPGet == nil && p.TCPSocket == nil {
return nil
}
var probePort *intstr.IntOrString
if p.HTTPGet != nil {
probePort = &p.HTTPGet.Port
} else {
probePort = &p.TCPSocket.Port
}
if probePort.Type == intstr.String {
port, exists := portMap[probePort.StrVal]
if !exists {
return nil
}
*probePort = intstr.FromInt(int(port))
} else if probePort.IntVal == targetPort {
// Already is rewritten
return nil
}
return p
}
for _, c := range podSpec.Containers {
if c.Name == ProxyContainerName {
continue
}
readyz, livez, startupz := status.FormatProberURL(c.Name)
portMap := map[string]int32{}
for _, p := range c.Ports {
if p.Name != "" {
portMap[p.Name] = p.ContainerPort
}
}
if h := updateNamedPort(kubeProbeToInternalProber(c.ReadinessProbe), portMap); h != nil {
out[readyz] = h
}
if h := updateNamedPort(kubeProbeToInternalProber(c.LivenessProbe), portMap); h != nil {
out[livez] = h
}
if h := updateNamedPort(kubeProbeToInternalProber(c.StartupProbe), portMap); h != nil {
out[startupz] = h
}
}
// prevent generate '{}'
if len(out) == 0 {
return ""
}
b, err := json.Marshal(out)
if err != nil {
log.Errorf("failed to serialize the app prober config %v", err)
return ""
}
return string(b)
}
// patchRewriteProbe generates the patch for webhook.
func patchRewriteProbe(annotations map[string]string, pod *corev1.Pod, defaultPort int32) {
statusPort := int(defaultPort)
if v, f := annotations[annotation.SidecarStatusPort.Name]; f {
p, err := strconv.Atoi(v)
if err != nil {
log.Errorf("Invalid annotation %v=%v: %v", annotation.SidecarStatusPort.Name, v, err)
}
statusPort = p
}
for i, c := range pod.Spec.Containers {
// Skip sidecar container.
if c.Name == ProxyContainerName {
continue
}
readyz, livez, startupz := status.FormatProberURL(c.Name)
if probePatch := convertAppProber(c.ReadinessProbe, readyz, statusPort); probePatch != nil {
c.ReadinessProbe = probePatch
}
if probePatch := convertAppProber(c.LivenessProbe, livez, statusPort); probePatch != nil {
c.LivenessProbe = probePatch
}
if probePatch := convertAppProber(c.StartupProbe, startupz, statusPort); probePatch != nil {
c.StartupProbe = probePatch
}
pod.Spec.Containers[i] = c
}
}
// kubeProbeToInternalProber converts a Kubernetes Probe to an Istio internal Prober
func kubeProbeToInternalProber(probe *corev1.Probe) *Prober {
if probe == nil {
return nil
}
if probe.HTTPGet != nil {
return &Prober{
HTTPGet: probe.HTTPGet,
TimeoutSeconds: probe.TimeoutSeconds,
}
}
if probe.TCPSocket != nil && features.RewriteTCPProbes {
return &Prober{
TCPSocket: probe.TCPSocket,
TimeoutSeconds: probe.TimeoutSeconds,
}
}
if probe.GRPC != nil {
return &Prober{
GRPC: probe.GRPC,
TimeoutSeconds: probe.TimeoutSeconds,
}
}
return nil
}