blob: 0f4089ed5a7cc27db2a79e6c4cd314ae69b2adb8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package webhook
import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
dubbo_cp "github.com/apache/dubbo-kubernetes/pkg/config/app/dubbo-cp"
"github.com/apache/dubbo-kubernetes/pkg/core/logger"
"github.com/mattbaird/jsonpatch"
admissionV1 "k8s.io/api/admission/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type (
PodPatch func(*v1.Pod) (*v1.Pod, error)
GetCertificate func(*tls.ClientHelloInfo) (*tls.Certificate, error)
)
type Webhook struct {
Patches []PodPatch
AllowOnErr bool
getCertificate GetCertificate
Server *http.Server
}
func NewWebhook(certificate GetCertificate) *Webhook {
return &Webhook{
getCertificate: certificate,
AllowOnErr: true,
}
}
func (wh *Webhook) NewServer(port int32) *http.Server {
mux := http.NewServeMux()
mux.HandleFunc("/health", wh.ServeHealth)
mux.HandleFunc("/mutating-services", wh.Mutate)
return &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
TLSConfig: &tls.Config{
GetCertificate: wh.getCertificate,
},
}
}
func (wh *Webhook) Init(options *dubbo_cp.Config) {
wh.Server = wh.NewServer(options.Webhook.Port)
wh.AllowOnErr = options.Webhook.AllowOnErr
}
// Serve only for test
func (wh *Webhook) Serve() {
err := wh.Server.ListenAndServeTLS("", "")
if err != nil {
logger.Sugar().Warnf("[Webhook] Serve webhook cp-server failed. %v", err.Error())
return
}
}
// Stop only for test
func (wh *Webhook) Stop() {
if err := wh.Server.Close(); err != nil {
logger.Sugar().Warnf("[Webhook] Stop webhook cp-server failed. %v", err.Error())
return
}
}
// ServeHealth returns 200 when things are good
func (wh *Webhook) ServeHealth(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
func (wh *Webhook) Mutate(w http.ResponseWriter, r *http.Request) {
var body []byte
if r.Body != nil {
if data, err := io.ReadAll(r.Body); err == nil {
body = data
}
}
logger.Sugar().Infof("[Webhook] Mutation request: " + string(body))
// verify the content type is accurate
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
outputLog := fmt.Sprintf("[Webhook] contentType=%s, expect application/json", contentType)
logger.Sugar().Errorf(outputLog)
w.WriteHeader(http.StatusUnsupportedMediaType)
return
}
var reviewResponse *admissionV1.AdmissionResponse
ar := admissionV1.AdmissionReview{}
if err := json.Unmarshal(body, &ar); err != nil {
outputLog := fmt.Sprintf("[Webhook] json unmarshal err=%s", err)
logger.Sugar().Errorf(outputLog)
reviewResponse = &admissionV1.AdmissionResponse{
Allowed: wh.AllowOnErr,
Result: &metav1.Status{
Status: "Failure",
Message: err.Error(),
Reason: metav1.StatusReason(err.Error()),
},
}
} else {
reviewResponse, err = wh.Admit(ar)
if err != nil {
logger.Sugar().Errorf(err.Error())
reviewResponse = &admissionV1.AdmissionResponse{
Allowed: wh.AllowOnErr,
Result: &metav1.Status{
Status: "Failure",
Message: err.Error(),
Reason: metav1.StatusReason(err.Error()),
},
}
}
}
response := admissionV1.AdmissionReview{}
response.TypeMeta.Kind = "AdmissionReview"
response.TypeMeta.APIVersion = "admission.k8s.io/v1"
response.Response = reviewResponse
logger.Sugar().Infof("[Webhook] AdmissionReview response: %v", response)
resp, err := json.Marshal(response)
if err != nil {
outputLog := fmt.Sprintf("[Webhook] response json unmarshal err=%s", err)
logger.Sugar().Errorf(outputLog)
}
if _, err := w.Write(resp); err != nil {
outputLog := fmt.Sprintf("[Webhook] write resp err=%s", err)
logger.Sugar().Errorf(outputLog)
}
}
func (wh *Webhook) Admit(ar admissionV1.AdmissionReview) (*admissionV1.AdmissionResponse, error) {
if ar.Request == nil {
return nil, fmt.Errorf("[Webhook] AdmissionReview request is nil")
}
reviewResponse := &admissionV1.AdmissionResponse{
Allowed: true,
UID: ar.Request.UID,
}
podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
if ar.Request.Resource != podResource {
outputLog := fmt.Sprintf("[Webhook] expect resource to be pods, but actual is %s", ar.Request.Resource)
return nil, fmt.Errorf(outputLog)
}
raw := ar.Request.Object.Raw
pod := v1.Pod{}
if err := json.Unmarshal(raw, &pod); err != nil {
outputLog := fmt.Sprintf("[Webhook] pod unmarshal error. %s", err)
return nil, fmt.Errorf(outputLog)
}
patchBytes, err := wh.PatchPod(&pod)
if err != nil {
outputLog := fmt.Sprintf("[Webhook] Patch error: %v. Msg: %s", pod.ObjectMeta.Name, err.Error())
return nil, fmt.Errorf(outputLog)
}
reviewResponse.Patch = patchBytes
logger.Sugar().Infof("[Webhook] Patch after mutate : %s", string(patchBytes))
pt := admissionV1.PatchTypeJSONPatch
reviewResponse.PatchType = &pt
return reviewResponse, nil
}
func (wh *Webhook) PatchPod(pod *v1.Pod) ([]byte, error) {
origin, originErr := json.Marshal(pod)
if originErr == nil {
logger.Sugar().Infof("[Webhook] Pod before mutate: %v", string(origin))
} else {
return nil, originErr
}
for _, patch := range wh.Patches {
patched, err := patch(pod)
if err != nil {
return nil, fmt.Errorf("[Webhook] Pod patch failed: %s", err.Error())
}
pod = patched
}
after, afterErr := json.Marshal(pod)
if afterErr == nil {
logger.Sugar().Infof("[Webhook] Pod after mutate: %v", string(after))
} else {
return nil, afterErr
}
patch, patchErr := jsonpatch.CreatePatch(origin, after)
if patchErr != nil {
return nil, patchErr
}
return json.Marshal(patch)
}