blob: a04c0b6c4cb9c6e8e66293c32da027ef2308301e [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"context"
"crypto/tls"
"fmt"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"go.uber.org/zap"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/log"
)
const (
HTTPPort = 9089
policyGroupEnvVarName = "POLICY_GROUP"
schedulerServiceAddressEnvVarName = "SCHEDULER_SERVICE_ADDRESS"
schedulerValidateConfURLPattern = "http://%s/ws/v1/validate-conf"
admissionControllerNamespace = "ADMISSION_CONTROLLER_NAMESPACE"
admissionControllerService = "ADMISSION_CONTROLLER_SERVICE"
admissionControllerProcessNamespaces = "ADMISSION_CONTROLLER_PROCESS_NAMESPACES"
admissionControllerBypassNamespaces = "ADMISSION_CONTROLLER_BYPASS_NAMESPACES"
admissionControllerLabelNamespaces = "ADMISSION_CONTROLLER_LABEL_NAMESPACES"
admissionControllerNoLabelNamespaces = "ADMISSION_CONTROLLER_NO_LABEL_NAMESPACES"
defaultBypassNamespaces = "^kube-system$"
// legal URLs
mutateURL = "/mutate"
validateConfURL = "/validate-conf"
healthURL = "/health"
)
type WebHook struct {
ac *admissionController
port int
server *http.Server
sync.Mutex
}
func main() {
namespace := os.Getenv(admissionControllerNamespace)
serviceName := os.Getenv(admissionControllerService)
processNamespaces, ok := os.LookupEnv(admissionControllerProcessNamespaces)
if !ok {
processNamespaces = ""
}
bypassNamespaces, ok := os.LookupEnv(admissionControllerBypassNamespaces)
if !ok {
bypassNamespaces = defaultBypassNamespaces
}
labelNamespaces, ok := os.LookupEnv(admissionControllerLabelNamespaces)
if !ok {
labelNamespaces = ""
}
noLabelNamespaces, ok := os.LookupEnv(admissionControllerNoLabelNamespaces)
if !ok {
noLabelNamespaces = ""
}
wm, err := NewWebhookManager(namespace, serviceName)
if err != nil {
log.Logger().Fatal("Failed to initialize webhook manager", zap.Error(err))
}
policyGroup := os.Getenv(policyGroupEnvVarName)
if policyGroup == "" {
policyGroup = conf.DefaultPolicyGroup
}
schedulerServiceAddress := os.Getenv(schedulerServiceAddressEnvVarName)
ac, err := initAdmissionController(
fmt.Sprintf("%s.yaml", policyGroup),
fmt.Sprintf(schedulerValidateConfURLPattern, schedulerServiceAddress),
processNamespaces, bypassNamespaces, labelNamespaces, noLabelNamespaces)
if err != nil {
log.Logger().Fatal("failed to configure admission controller", zap.Error(err))
}
webhook := CreateWebhook(ac, HTTPPort)
certs := UpdateWebhookConfiguration(wm)
webhook.Startup(certs)
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1)
WaitForCertExpiration(wm, signalChan)
for {
switch <-signalChan {
case syscall.SIGUSR1: // reload certificates
certs := UpdateWebhookConfiguration(wm)
webhook.Shutdown()
webhook.Startup(certs)
WaitForCertExpiration(wm, signalChan)
default: // terminate
webhook.Shutdown()
os.Exit(0)
}
}
}
func WaitForCertExpiration(wm WebhookManager, ch chan os.Signal) {
go func() {
wm.WaitForCertificateExpiration()
ch <- syscall.SIGUSR1
}()
}
func UpdateWebhookConfiguration(wm WebhookManager) *tls.Certificate {
err := wm.LoadCACertificates()
if err != nil {
log.Logger().Fatal("Failed to initialize CA certificates", zap.Error(err))
}
certs, err := wm.GenerateServerCertificate()
if err != nil {
log.Logger().Fatal("Unable to generate server certificate", zap.Error(err))
}
err = wm.InstallWebhooks()
if err != nil {
log.Logger().Fatal("Unable to install webhooks for admission controller", zap.Error(err))
}
return certs
}
func CreateWebhook(ac *admissionController, port int) *WebHook {
return &WebHook{
ac: ac,
port: port,
}
}
func (wh *WebHook) Startup(certs *tls.Certificate) {
wh.Lock()
defer wh.Unlock()
mux := http.NewServeMux()
mux.HandleFunc(healthURL, wh.ac.health)
mux.HandleFunc(mutateURL, wh.ac.serve)
mux.HandleFunc(validateConfURL, wh.ac.serve)
wh.server = &http.Server{
Addr: fmt.Sprintf(":%v", wh.port),
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
Certificates: []tls.Certificate{*certs}},
Handler: mux,
}
go func() {
if err := wh.server.ListenAndServeTLS("", ""); err != nil {
if err == http.ErrServerClosed {
log.Logger().Info("existing server closed")
} else {
log.Logger().Fatal("failed to start admission controller", zap.Error(err))
}
}
}()
log.Logger().Info("the admission controller started",
zap.Int("port", HTTPPort),
zap.Strings("listeningOn", []string{healthURL, mutateURL, validateConfURL}))
}
func (wh *WebHook) Shutdown() {
wh.Lock()
defer wh.Unlock()
if wh.server != nil {
log.Logger().Info("shutting down the admission controller...")
err := wh.server.Shutdown(context.Background())
if err != nil {
log.Logger().Fatal("failed to stop the admission controller", zap.Error(err))
}
wh.server = nil
}
}