blob: 0528310e66038a624159c1f11875d0e5bb79e6e1 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package util
import (
admissionv1 ""
corev1 ""
metav1 ""
var (
// RuntimeScheme defines methods for serializing and deserializing API objects
runtimeScheme = runtime.NewScheme()
// Codecs serializers for specific versions and content types
codecs = serializer.NewCodecFactory(runtimeScheme)
// Deserializer attempts to load an object from data
deserializer = codecs.UniversalDeserializer()
httpClient = http.Client{
Timeout: time.Second * 15,
jsonContentType = "application/json"
// AdmissionReviewHandler handles AdmissionReviews and set response in them.
type AdmissionReviewHandler func(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionReview
// handleResponse write message to http response.
func handleResponse(w http.ResponseWriter, status int, message string) {
if _, err := w.Write([]byte(message)); err != nil {
klog.Errorf("write message (%v) failed: %v", message, err)
// AdmissionReviewFailed returns error for the AdmissionReview.
func AdmissionReviewFailed(ar *admissionv1.AdmissionReview,
err error) *admissionv1.AdmissionReview {
ar.Response = &admissionv1.AdmissionResponse{
UID: ar.Request.UID,
Result: &metav1.Status{
Message: fmt.Sprintf("handle admission review failed: %v", err),
return ar
// AdmissionReviewAllow allows the AdmissionReview.
func AdmissionReviewAllow(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionReview {
ar.Response = &admissionv1.AdmissionResponse{
UID: ar.Request.UID,
Allowed: true,
return ar
// AdmissionReviewForbidden forbids the AdmissionReview with delete operation.
func AdmissionReviewForbidden(ar *admissionv1.AdmissionReview,
message string) *admissionv1.AdmissionReview {
ar.Response = &admissionv1.AdmissionResponse{
UID: ar.Request.UID,
Result: &metav1.Status{
Message: message,
return ar
// AdmissionReviewWithPatches returns the AdmissionReview with patches in response.
func AdmissionReviewWithPatches(ar *admissionv1.AdmissionReview,
patches []byte) *admissionv1.AdmissionReview {
ar.Response = &admissionv1.AdmissionResponse{
UID: ar.Request.UID,
Allowed: true,
Patch: patches,
PatchType: func() *admissionv1.PatchType {
pt := admissionv1.PatchTypeJSONPatch
return &pt
return ar
// WithAdmissionReviewHandler checks before InspectorFunc executes and creates a handleFunc.
func WithAdmissionReviewHandler(handler AdmissionReviewHandler) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
if req.Body == nil {
klog.Error("Receive an invalid ar, body is empty")
handleResponse(w, http.StatusBadRequest, "ar body required")
data, err := ioutil.ReadAll(req.Body)
if err != nil {
klog.Errorf("Read ar body failed: %v", err)
handleResponse(w, http.StatusInternalServerError,
fmt.Sprintf("read ar body failed: %v", err))
ar := &admissionv1.AdmissionReview{}
if _, _, err = deserializer.Decode(data, nil, ar); err != nil {
klog.Errorf("Parse ar body failed: %s, %v", string(data), err)
handleResponse(w, http.StatusBadRequest, fmt.Sprintf("parse ar failed: %v", err))
klog.V(4).Infof("receive request: %v/%v/%v from %+v, verb: %+v",
ar.Request.Namespace, ar.Request.Name, ar.Request.UID, ar.Request.UserInfo,
var respBytes []byte
respBytes, err = json.Marshal(handler(ar))
if err != nil {
handleResponse(w, http.StatusInternalServerError,
fmt.Sprintf("marshal response failed: %v", err))
if _, err := w.Write(respBytes); err != nil {
klog.Errorf("Send response failed: %v", err)
// NeedInspectPod returns whether we need to inspect the pod.
func NeedInspectPod(pod *corev1.Pod) bool {
if pod.DeletionTimestamp != nil || pod.Labels == nil {
return false
if val, ok := pod.Labels[constants.LabelShuffleServer]; ok && val == "true" {
return true
return false
// JSONFloat is used to parse the float64 which may be NaN
type JSONFloat float64
// MarshalJSON return bytes representing JSONFloat
func (j JSONFloat) MarshalJSON() ([]byte, error) {
v := float64(j)
if math.IsNaN(v) {
s := "\"NaN\""
return []byte(s), nil
return json.Marshal(v) // marshal result as standard float64
// UnmarshalJSON return the parsed JSONFloat
func (j *JSONFloat) UnmarshalJSON(v []byte) error {
if s := string(v); s == "\"NaN\"" {
*j = JSONFloat(math.NaN())
return nil
// just a regular float value
var fv float64
if err := json.Unmarshal(v, &fv); err != nil {
return err
*j = JSONFloat(fv)
return nil
// MetricItem records an item of metric information of shuffle servers.
type MetricItem struct {
Name string `json:"name"`
LabelNames []string `json:"labelNames"`
LabelValues []string `json:"labelValues"`
Value JSONFloat `json:"value"`
// MetricList records all items of metric information of shuffle servers.
type MetricList struct {
Metrics []*MetricItem `json:"metrics"`
TimeStamp int64 `json:"timestamp"`
func getLastAppNum(body []byte) (int, error) {
resp := &MetricList{}
if err := json.Unmarshal(body, resp); err != nil {
klog.Errorf("unmarshal body (%v) failed: %v", string(body), err)
return 0, err
for i := range resp.Metrics {
if resp.Metrics[i].Name == "app_num_with_node" {
return int(resp.Metrics[i].Value), nil
return 0, nil
// HasZeroApps returns whether there are zero apps in the shuffle server pod.
func HasZeroApps(pod *corev1.Pod) bool {
port := utils.GetMetricsServerPort(pod)
if len(port) == 0 {
return true
if pod.Status.Phase != corev1.PodRunning {
return true
url := fmt.Sprintf("http://%v:%v/metrics/server", pod.Status.PodIP, port)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
klog.Errorf("new request failed with error: %v->%+v", url, err)
return true
// the request accept json response only
req.Header.Set("Accept", jsonContentType)
resp, err := httpClient.Do(req)
if err != nil {
klog.Errorf("send metrics server request failed: %v->%+v", url, err)
return true
if resp.StatusCode != http.StatusOK {
klog.Errorf("heartbeat response failed: invalid status (%v->%v)", url, resp.Status)
return false
body, err := io.ReadAll(resp.Body)
if err != nil {
klog.Errorf("heartbeat response failed: read body with err:%+v", err)
return false
if num, err := getLastAppNum(body); err != nil {
klog.Errorf("get last app number of (%v) failed: %v", pod.Spec.NodeName, err)
return false
} else if num > 0 {
klog.V(4).Infof("last %v apps in node %v", num, pod.Spec.NodeName)
return false
return true