blob: f353dee81edb65469420725e2858c61f2e29f95b [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package status
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"mime"
"net"
"net/http"
"net/http/pprof"
"os"
"regexp"
"strconv"
"strings"
"sync"
"syscall"
"time"
)
import (
ocprom "contrib.go.opencensus.io/exporter/prometheus"
"github.com/hashicorp/go-multierror"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/common/expfmt"
"go.opencensus.io/stats/view"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
grpcHealth "google.golang.org/grpc/health/grpc_health_v1"
grpcStatus "google.golang.org/grpc/status"
"istio.io/pkg/env"
"istio.io/pkg/log"
"k8s.io/apimachinery/pkg/util/intstr"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/cmd/pilot-agent/metrics"
"github.com/apache/dubbo-go-pixiu/pilot/cmd/pilot-agent/status/grpcready"
"github.com/apache/dubbo-go-pixiu/pilot/cmd/pilot-agent/status/ready"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pkg/config"
dnsProto "github.com/apache/dubbo-go-pixiu/pkg/dns/proto"
"github.com/apache/dubbo-go-pixiu/pkg/kube/apimirror"
)
const (
// readyPath is for the pilot agent readiness itself.
readyPath = "/healthz/ready"
// quitPath is to notify the pilot agent to quit.
quitPath = "/quitquitquit"
// KubeAppProberEnvName is the name of the command line flag for pilot agent to pass app prober config.
// The json encoded string to pass app HTTP probe information from injector(istioctl or webhook).
// For example, ISTIO_KUBE_APP_PROBERS='{"/app-health/httpbin/livez":{"httpGet":{"path": "/hello", "port": 8080}}.
// indicates that httpbin container liveness prober port is 8080 and probing path is /hello.
// This environment variable should never be set manually.
KubeAppProberEnvName = "ISTIO_KUBE_APP_PROBERS"
localHostIPv4 = "127.0.0.1"
localHostIPv6 = "[::1]"
)
var (
UpstreamLocalAddressIPv4 = &net.TCPAddr{IP: net.ParseIP("127.0.0.6")}
UpstreamLocalAddressIPv6 = &net.TCPAddr{IP: net.ParseIP("::6")}
)
var PrometheusScrapingConfig = env.RegisterStringVar("ISTIO_PROMETHEUS_ANNOTATIONS", "", "")
var (
appProberPattern = regexp.MustCompile(`^/app-health/[^/]+/(livez|readyz|startupz)$`)
promRegistry *prometheus.Registry
LegacyLocalhostProbeDestination = env.RegisterBoolVar("REWRITE_PROBE_LEGACY_LOCALHOST_DESTINATION", false,
"If enabled, readiness probes will be sent to 'localhost'. Otherwise, they will be sent to the Pod's IP, matching Kubernetes' behavior.")
ProbeKeepaliveConnections = env.RegisterBoolVar("ENABLE_PROBE_KEEPALIVE_CONNECTIONS", false,
"If enabled, readiness probes will keep the connection from pilot-agent to the application alive. "+
"This mirrors older Istio versions' behaviors, but not kubelet's.").Get()
)
// KubeAppProbers holds the information about a Kubernetes pod prober.
// It's a map from the prober URL path to the Kubernetes Prober config.
// For example, "/app-health/hello-world/livez" entry contains liveness prober config for
// container "hello-world".
type KubeAppProbers map[string]*Prober
// Prober represents a single container prober
type Prober struct {
HTTPGet *apimirror.HTTPGetAction `json:"httpGet,omitempty"`
TCPSocket *apimirror.TCPSocketAction `json:"tcpSocket,omitempty"`
GRPC *apimirror.GRPCAction `json:"grpc,omitempty"`
TimeoutSeconds int32 `json:"timeoutSeconds,omitempty"`
}
// Options for the status server.
type Options struct {
// Ip of the pod. Note: this is only applicable for Kubernetes pods and should only be used for
// the prober.
PodIP string
// KubeAppProbers is a json with Kubernetes application prober config encoded.
KubeAppProbers string
NodeType model.NodeType
StatusPort uint16
AdminPort uint16
IPv6 bool
Probes []ready.Prober
EnvoyPrometheusPort int
Context context.Context
FetchDNS func() *dnsProto.NameTable
NoEnvoy bool
GRPCBootstrap string
}
// Server provides an endpoint for handling status probes.
type Server struct {
ready []ready.Prober
prometheus *PrometheusScrapeConfiguration
mutex sync.RWMutex
appProbersDestination string
appKubeProbers KubeAppProbers
appProbeClient map[string]*http.Client
statusPort uint16
lastProbeSuccessful bool
envoyStatsPort int
fetchDNS func() *dnsProto.NameTable
upstreamLocalAddress *net.TCPAddr
config Options
}
func init() {
registry := prometheus.NewRegistry()
wrapped := prometheus.WrapRegistererWithPrefix("istio_agent_", prometheus.Registerer(registry))
wrapped.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
wrapped.MustRegister(collectors.NewGoCollector())
promRegistry = registry
// go collector metrics collide with other metrics.
exporter, err := ocprom.NewExporter(ocprom.Options{Registry: registry, Registerer: wrapped})
if err != nil {
log.Fatalf("could not setup exporter: %v", err)
}
view.RegisterExporter(exporter)
}
// NewServer creates a new status server.
func NewServer(config Options) (*Server, error) {
localhost := localHostIPv4
upstreamLocalAddress := UpstreamLocalAddressIPv4
if config.IPv6 {
localhost = localHostIPv6
upstreamLocalAddress = UpstreamLocalAddressIPv6
}
probes := make([]ready.Prober, 0)
if !config.NoEnvoy {
probes = append(probes, &ready.Probe{
LocalHostAddr: localhost,
AdminPort: config.AdminPort,
Context: config.Context,
NoEnvoy: config.NoEnvoy,
})
}
if config.GRPCBootstrap != "" {
probes = append(probes, grpcready.NewProbe(config.GRPCBootstrap))
}
probes = append(probes, config.Probes...)
s := &Server{
statusPort: config.StatusPort,
ready: probes,
appProbersDestination: wrapIPv6(config.PodIP),
envoyStatsPort: config.EnvoyPrometheusPort,
fetchDNS: config.FetchDNS,
upstreamLocalAddress: upstreamLocalAddress,
config: config,
}
if LegacyLocalhostProbeDestination.Get() {
s.appProbersDestination = "localhost"
}
// Enable prometheus server if its configured and a sidecar
// Because port 15020 is exposed in the gateway Services, we cannot safely serve this endpoint
// If we need to do this in the future, we should use envoy to do routing or have another port to make this internal
// only. For now, its not needed for gateway, as we can just get Envoy stats directly, but if we
// want to expose istio-agent metrics we may want to revisit this.
if cfg, f := PrometheusScrapingConfig.Lookup(); config.NodeType == model.SidecarProxy && f {
var prom PrometheusScrapeConfiguration
if err := json.Unmarshal([]byte(cfg), &prom); err != nil {
return nil, fmt.Errorf("failed to unmarshal %s: %v", PrometheusScrapingConfig.Name, err)
}
log.Infof("Prometheus scraping configuration: %v", prom)
if prom.Scrape != "false" {
s.prometheus = &prom
if s.prometheus.Path == "" {
s.prometheus.Path = "/metrics"
}
if s.prometheus.Port == "" {
s.prometheus.Port = "80"
}
if s.prometheus.Port == strconv.Itoa(int(config.StatusPort)) {
return nil, fmt.Errorf("invalid prometheus scrape configuration: "+
"application port is the same as agent port, which may lead to a recursive loop. "+
"Ensure pod does not have prometheus.io/port=%d label, or that injection is not happening multiple times", config.StatusPort)
}
}
}
if config.KubeAppProbers == "" {
return s, nil
}
if err := json.Unmarshal([]byte(config.KubeAppProbers), &s.appKubeProbers); err != nil {
return nil, fmt.Errorf("failed to decode app prober err = %v, json string = %v", err, config.KubeAppProbers)
}
s.appProbeClient = make(map[string]*http.Client, len(s.appKubeProbers))
// Validate the map key matching the regex pattern.
for path, prober := range s.appKubeProbers {
err := validateAppKubeProber(path, prober)
if err != nil {
return nil, err
}
if prober.HTTPGet != nil {
d := &net.Dialer{
LocalAddr: s.upstreamLocalAddress,
}
// Construct a http client and cache it in order to reuse the connection.
s.appProbeClient[path] = &http.Client{
Timeout: time.Duration(prober.TimeoutSeconds) * time.Second,
// We skip the verification since kubelet skips the verification for HTTPS prober as well
// https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/#configure-probes
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: d.DialContext,
// https://github.com/kubernetes/kubernetes/blob/0153febd9f0098d4b8d0d484927710eaf899ef40/pkg/probe/http/http.go#L55
// Match Kubernetes logic. This also ensures idle timeouts do not trigger probe failures
DisableKeepAlives: !ProbeKeepaliveConnections,
},
CheckRedirect: redirectChecker(),
}
}
}
return s, nil
}
// Copies logic from https://github.com/kubernetes/kubernetes/blob/b152001f459/pkg/probe/http/http.go#L129-L130
func isRedirect(code int) bool {
return code >= http.StatusMultipleChoices && code < http.StatusBadRequest
}
// Using the same redirect logic that kubelet does: https://github.com/kubernetes/kubernetes/blob/b152001f459/pkg/probe/http/http.go#L141
// This means that:
// * If we exceed 10 redirects, the probe fails
// * If we redirect somewhere external, the probe succeeds (https://github.com/kubernetes/kubernetes/blob/b152001f459/pkg/probe/http/http.go#L130)
// * If we redirect to the same address, the probe will follow the redirect
func redirectChecker() func(*http.Request, []*http.Request) error {
return func(req *http.Request, via []*http.Request) error {
if req.URL.Hostname() != via[0].URL.Hostname() {
return http.ErrUseLastResponse
}
// Default behavior: stop after 10 redirects.
if len(via) >= 10 {
return errors.New("stopped after 10 redirects")
}
return nil
}
}
func validateAppKubeProber(path string, prober *Prober) error {
if !appProberPattern.MatchString(path) {
return fmt.Errorf(`invalid path, must be in form of regex pattern %v`, appProberPattern)
}
count := 0
if prober.HTTPGet != nil {
count++
}
if prober.TCPSocket != nil {
count++
}
if prober.GRPC != nil {
count++
}
if count != 1 {
return fmt.Errorf(`invalid prober type, must be one of type httpGet, tcpSocket or gRPC`)
}
if prober.HTTPGet != nil && prober.HTTPGet.Port.Type != intstr.Int {
return fmt.Errorf("invalid prober config for %v, the port must be int type", path)
}
if prober.TCPSocket != nil && prober.TCPSocket.Port.Type != intstr.Int {
return fmt.Errorf("invalid prober config for %v, the port must be int type", path)
}
return nil
}
// FormatProberURL returns a set of HTTP URLs that pilot agent will serve to take over Kubernetes
// app probers.
func FormatProberURL(container string) (string, string, string) {
return fmt.Sprintf("/app-health/%v/readyz", container),
fmt.Sprintf("/app-health/%v/livez", container),
fmt.Sprintf("/app-health/%v/startupz", container)
}
// Run opens a the status port and begins accepting probes.
func (s *Server) Run(ctx context.Context) {
log.Infof("Opening status port %d", s.statusPort)
mux := http.NewServeMux()
// Add the handler for ready probes.
mux.HandleFunc(readyPath, s.handleReadyProbe)
// Default path for prom
mux.HandleFunc(`/metrics`, s.handleStats)
// Envoy uses something else - and original agent used the same.
// Keep for backward compat with configs.
mux.HandleFunc(`/stats/prometheus`, s.handleStats)
mux.HandleFunc(quitPath, s.handleQuit)
mux.HandleFunc("/app-health/", s.handleAppProbe)
// Add the handler for pprof.
mux.HandleFunc("/debug/pprof/", s.handlePprofIndex)
mux.HandleFunc("/debug/pprof/cmdline", s.handlePprofCmdline)
mux.HandleFunc("/debug/pprof/profile", s.handlePprofProfile)
mux.HandleFunc("/debug/pprof/symbol", s.handlePprofSymbol)
mux.HandleFunc("/debug/pprof/trace", s.handlePprofTrace)
mux.HandleFunc("/debug/ndsz", s.handleNdsz)
l, err := net.Listen("tcp", fmt.Sprintf(":%d", s.statusPort))
if err != nil {
log.Errorf("Error listening on status port: %v", err.Error())
return
}
// for testing.
if s.statusPort == 0 {
addrs := strings.Split(l.Addr().String(), ":")
allocatedPort, _ := strconv.Atoi(addrs[len(addrs)-1])
s.mutex.Lock()
s.statusPort = uint16(allocatedPort)
s.mutex.Unlock()
}
defer l.Close()
go func() {
if err := http.Serve(l, mux); err != nil {
log.Error(err)
select {
case <-ctx.Done():
// We are shutting down already, don't trigger SIGTERM
return
default:
// If the server errors then pilot-agent can never pass readiness or liveness probes
// Therefore, trigger graceful termination by sending SIGTERM to the binary pid
notifyExit()
}
}
}()
// Wait for the agent to be shut down.
<-ctx.Done()
log.Info("Status server has successfully terminated")
}
func (s *Server) handlePprofIndex(w http.ResponseWriter, r *http.Request) {
if !isRequestFromLocalhost(r) {
http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
return
}
pprof.Index(w, r)
}
func (s *Server) handlePprofCmdline(w http.ResponseWriter, r *http.Request) {
if !isRequestFromLocalhost(r) {
http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
return
}
pprof.Cmdline(w, r)
}
func (s *Server) handlePprofSymbol(w http.ResponseWriter, r *http.Request) {
if !isRequestFromLocalhost(r) {
http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
return
}
pprof.Symbol(w, r)
}
func (s *Server) handlePprofProfile(w http.ResponseWriter, r *http.Request) {
if !isRequestFromLocalhost(r) {
http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
return
}
pprof.Profile(w, r)
}
func (s *Server) handlePprofTrace(w http.ResponseWriter, r *http.Request) {
if !isRequestFromLocalhost(r) {
http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
return
}
pprof.Trace(w, r)
}
func (s *Server) handleReadyProbe(w http.ResponseWriter, _ *http.Request) {
err := s.isReady()
s.mutex.Lock()
if err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
log.Warnf("Envoy proxy is NOT ready: %s", err.Error())
s.lastProbeSuccessful = false
} else {
w.WriteHeader(http.StatusOK)
if !s.lastProbeSuccessful {
log.Info("Envoy proxy is ready")
}
s.lastProbeSuccessful = true
}
s.mutex.Unlock()
}
func (s *Server) isReady() error {
for _, p := range s.ready {
if err := p.Check(); err != nil {
return err
}
}
return nil
}
func isRequestFromLocalhost(r *http.Request) bool {
ip, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
return false
}
userIP := net.ParseIP(ip)
return userIP.IsLoopback()
}
type PrometheusScrapeConfiguration struct {
Scrape string `json:"scrape"`
Path string `json:"path"`
Port string `json:"port"`
}
// handleStats handles prometheus stats scraping. This will scrape envoy metrics, and, if configured,
// the application metrics and merge them together.
// The merge here is a simple string concatenation. This works for almost all cases, assuming the application
// is not exposing the same metrics as Envoy.
// This merging works for both FmtText and FmtOpenMetrics and will use the format of the application metrics
// Note that we do not return any errors here. If we do, we will drop metrics. For example, the app may be having issues,
// but we still want Envoy metrics. Instead, errors are tracked in the failed scrape metrics/logs.
func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
metrics.ScrapeTotals.Increment()
var envoy, application, agent []byte
var err error
// Gather all the metrics we will merge
if !s.config.NoEnvoy {
if envoy, _, err = s.scrape(fmt.Sprintf("http://localhost:%d/stats/prometheus", s.envoyStatsPort), r.Header); err != nil {
log.Errorf("failed scraping envoy metrics: %v", err)
metrics.EnvoyScrapeErrors.Increment()
}
// Process envoy's metrics to make them compatible with FmtOpenMetrics
envoy = processMetrics(envoy)
}
// Scrape app metrics if defined and capture their format
var format expfmt.Format
if s.prometheus != nil {
var contentType string
url := fmt.Sprintf("http://localhost:%s%s", s.prometheus.Port, s.prometheus.Path)
if application, contentType, err = s.scrape(url, r.Header); err != nil {
log.Errorf("failed scraping application metrics: %v", err)
metrics.AppScrapeErrors.Increment()
}
format = negotiateMetricsFormat(contentType)
} else {
// Without app metrics format use a default
format = expfmt.FmtText
}
if agent, err = scrapeAgentMetrics(); err != nil {
log.Errorf("failed scraping agent metrics: %v", err)
metrics.AgentScrapeErrors.Increment()
}
w.Header().Set("Content-Type", string(format))
// Write out the metrics
if _, err := w.Write(agent); err != nil {
log.Errorf("failed to write agent metrics: %v", err)
metrics.AgentScrapeErrors.Increment()
}
if envoy != nil {
if _, err := w.Write(envoy); err != nil {
log.Errorf("failed to write envoy metrics: %v", err)
metrics.EnvoyScrapeErrors.Increment()
}
}
// App metrics must go last because if they are FmtOpenMetrics,
// they will have a trailing "# EOF" which terminates the full exposition
if _, err := w.Write(application); err != nil {
log.Errorf("failed to write application metrics: %v", err)
metrics.AppScrapeErrors.Increment()
}
}
func negotiateMetricsFormat(contentType string) expfmt.Format {
mediaType, _, err := mime.ParseMediaType(contentType)
if err == nil && mediaType == expfmt.OpenMetricsType {
return expfmt.FmtOpenMetrics
}
return expfmt.FmtText
}
func processMetrics(metrics []byte) []byte {
return bytes.ReplaceAll(metrics, []byte("\n\n"), []byte("\n"))
}
func scrapeAgentMetrics() ([]byte, error) {
buf := &bytes.Buffer{}
mfs, err := promRegistry.Gather()
enc := expfmt.NewEncoder(buf, expfmt.FmtText)
if err != nil {
return nil, err
}
var errs error
for _, mf := range mfs {
if err := enc.Encode(mf); err != nil {
errs = multierror.Append(errs, err)
}
}
return buf.Bytes(), errs
}
func applyHeaders(into http.Header, from http.Header, keys ...string) {
for _, key := range keys {
val := from.Get(key)
if val != "" {
into.Set(key, val)
}
}
}
// getHeaderTimeout parse a string like (1.234) representing number of seconds
func getHeaderTimeout(timeout string) (time.Duration, error) {
timeoutSeconds, err := strconv.ParseFloat(timeout, 64)
if err != nil {
return 0 * time.Second, err
}
return time.Duration(timeoutSeconds * 1e9), nil
}
// scrape will send a request to the provided url to scrape metrics from
// This will attempt to mimic some of Prometheus functionality by passing some of the headers through
// such as accept, timeout, and user agent
// Returns the scraped metrics as well as the response's "Content-Type" header to determine the metrics format
func (s *Server) scrape(url string, header http.Header) ([]byte, string, error) {
ctx := context.Background()
if timeoutString := header.Get("X-Prometheus-Scrape-Timeout-Seconds"); timeoutString != "" {
timeout, err := getHeaderTimeout(timeoutString)
if err != nil {
log.Warnf("Failed to parse timeout header %v: %v", timeoutString, err)
} else {
c, cancel := context.WithTimeout(ctx, timeout)
ctx = c
defer cancel()
}
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, "", err
}
applyHeaders(req.Header, header, "Accept",
"User-Agent",
"X-Prometheus-Scrape-Timeout-Seconds",
)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, "", fmt.Errorf("error scraping %s: %v", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("error scraping %s, status code: %v", url, resp.StatusCode)
}
metrics, err := io.ReadAll(resp.Body)
if err != nil {
return nil, "", fmt.Errorf("error reading %s: %v", url, err)
}
format := resp.Header.Get("Content-Type")
return metrics, format, nil
}
func (s *Server) handleQuit(w http.ResponseWriter, r *http.Request) {
if !isRequestFromLocalhost(r) {
http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
return
}
if r.Method != http.MethodPost {
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
log.Infof("handling %s, notifying pilot-agent to exit", quitPath)
notifyExit()
}
func (s *Server) handleAppProbe(w http.ResponseWriter, req *http.Request) {
// Validate the request first.
path := req.URL.Path
if !strings.HasPrefix(path, "/") {
path = "/" + req.URL.Path
}
prober, exists := s.appKubeProbers[path]
if !exists {
log.Errorf("Prober does not exists url %v", path)
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(fmt.Sprintf("app prober config does not exists for %v", path)))
return
}
switch {
case prober.HTTPGet != nil:
s.handleAppProbeHTTPGet(w, req, prober, path)
case prober.TCPSocket != nil:
s.handleAppProbeTCPSocket(w, prober)
case prober.GRPC != nil:
s.handleAppProbeGRPC(w, req, prober)
}
}
func (s *Server) handleAppProbeHTTPGet(w http.ResponseWriter, req *http.Request, prober *Prober, path string) {
proberPath := prober.HTTPGet.Path
if !strings.HasPrefix(proberPath, "/") {
proberPath = "/" + proberPath
}
var url string
if prober.HTTPGet.Scheme == apimirror.URISchemeHTTPS {
url = fmt.Sprintf("https://%s:%v%s", s.appProbersDestination, prober.HTTPGet.Port.IntValue(), proberPath)
} else {
url = fmt.Sprintf("http://%s:%v%s", s.appProbersDestination, prober.HTTPGet.Port.IntValue(), proberPath)
}
appReq, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
log.Errorf("Failed to create request to probe app %v, original url %v", err, path)
w.WriteHeader(http.StatusInternalServerError)
return
}
// Forward incoming headers to the application.
for name, values := range req.Header {
newValues := make([]string, len(values))
copy(newValues, values)
appReq.Header[name] = newValues
}
// If there are custom HTTPHeaders, it will override the forwarding header
if headers := prober.HTTPGet.HTTPHeaders; len(headers) != 0 {
for _, h := range headers {
delete(appReq.Header, h.Name)
}
for _, h := range headers {
if h.Name == "Host" || h.Name == ":authority" {
// Probe has specific host header override; honor it
appReq.Host = h.Value
appReq.Header.Set(h.Name, h.Value)
} else {
appReq.Header.Add(h.Name, h.Value)
}
}
}
// get the http client must exist because
httpClient := s.appProbeClient[path]
// Send the request.
response, err := httpClient.Do(appReq)
if err != nil {
log.Errorf("Request to probe app failed: %v, original URL path = %v\napp URL path = %v", err, path, proberPath)
w.WriteHeader(http.StatusInternalServerError)
return
}
defer func() {
// Drain and close the body to let the Transport reuse the connection
_, _ = io.Copy(io.Discard, response.Body)
_ = response.Body.Close()
}()
if isRedirect(response.StatusCode) { // Redirect
// In other cases, we return the original status code. For redirects, it is illegal to
// not have Location header, so we need to switch to just 200.
w.WriteHeader(http.StatusOK)
return
}
// We only write the status code to the response.
w.WriteHeader(response.StatusCode)
}
func (s *Server) handleAppProbeTCPSocket(w http.ResponseWriter, prober *Prober) {
port := prober.TCPSocket.Port.IntValue()
timeout := time.Duration(prober.TimeoutSeconds) * time.Second
d := &net.Dialer{
LocalAddr: s.upstreamLocalAddress,
Timeout: timeout,
}
conn, err := d.Dial("tcp", fmt.Sprintf("%s:%d", s.appProbersDestination, port))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
conn.Close()
}
}
func (s *Server) handleAppProbeGRPC(w http.ResponseWriter, req *http.Request, prober *Prober) {
timeout := time.Duration(prober.TimeoutSeconds) * time.Second
// the DialOptions are referenced from https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/probe/grpc/grpc.go#L55-L59
opts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()), // credentials are currently not supported
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
d := net.Dialer{
LocalAddr: s.upstreamLocalAddress,
Timeout: timeout,
}
return d.DialContext(ctx, "tcp", addr)
}),
}
if userAgent := req.Header["User-Agent"]; len(userAgent) > 0 {
// simulate kubelet
// please refer to:
// https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/probe/grpc/grpc.go#L56
// https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/probe/http/http.go#L103
opts = append(opts, grpc.WithUserAgent(userAgent[0]))
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
addr := fmt.Sprintf("%s:%d", s.appProbersDestination, prober.GRPC.Port)
conn, err := grpc.DialContext(ctx, addr, opts...)
if err != nil {
log.Errorf("Failed to create grpc connection to probe app: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
defer conn.Close()
var svc string
if prober.GRPC.Service != nil {
svc = *prober.GRPC.Service
}
grpcClient := grpcHealth.NewHealthClient(conn)
resp, err := grpcClient.Check(ctx, &grpcHealth.HealthCheckRequest{
Service: svc,
})
// the error handling is referenced from https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/probe/grpc/grpc.go#L88-L106
if err != nil {
status, ok := grpcStatus.FromError(err)
if ok {
switch status.Code() {
case codes.Unimplemented:
log.Errorf("server does not implement the grpc health protocol (grpc.health.v1.Health): %v", err)
case codes.DeadlineExceeded:
log.Errorf("grpc request not finished within timeout: %v", err)
default:
log.Errorf("grpc probe failed: %v", err)
}
} else {
log.Errorf("grpc probe failed: %v", err)
}
w.WriteHeader(http.StatusInternalServerError)
return
}
if resp.GetStatus() == grpcHealth.HealthCheckResponse_SERVING {
w.WriteHeader(http.StatusOK)
return
}
w.WriteHeader(http.StatusInternalServerError)
}
func (s *Server) handleNdsz(w http.ResponseWriter, r *http.Request) {
if !isRequestFromLocalhost(r) {
http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
return
}
nametable := s.fetchDNS()
if nametable == nil {
// See https://golang.org/doc/faq#nil_error for why writeJSONProto cannot handle this
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{}`))
return
}
writeJSONProto(w, nametable)
}
// writeJSONProto writes a protobuf to a json payload, handling content type, marshaling, and errors
func writeJSONProto(w http.ResponseWriter, obj interface{}) {
w.Header().Set("Content-Type", "application/json")
b, err := config.ToJSON(obj)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
_, err = w.Write(b)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
}
// notifyExit sends SIGTERM to itself
func notifyExit() {
p, err := os.FindProcess(os.Getpid())
if err != nil {
log.Error(err)
}
if err := p.Signal(syscall.SIGTERM); err != nil {
log.Errorf("failed to send SIGTERM to self: %v", err)
}
}
// wrapIPv6 wraps the ip into "[]" in case of ipv6
func wrapIPv6(ipAddr string) string {
addr := net.ParseIP(ipAddr)
if addr == nil {
return ipAddr
}
if addr.To4() != nil {
return ipAddr
}
return fmt.Sprintf("[%s]", ipAddr)
}