blob: 5a1375f43ab6c1fedafd304a445a147847263e3c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package util
import (
"fmt"
"strings"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
controllerconstants "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/constants"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
)
// AddOwnerReference add OwnerReference to an object.
func AddOwnerReference(meta *metav1.ObjectMeta, rss *v1alpha1.RemoteShuffleService) {
meta.OwnerReferences = append(meta.OwnerReferences, generateOwnerReference(rss)...)
}
// generateOwnerReference builds an OwnerReference for rss objects.
func generateOwnerReference(rss *v1alpha1.RemoteShuffleService) []metav1.OwnerReference {
return []metav1.OwnerReference{
{
APIVersion: v1alpha1.SchemeGroupVersion.String(),
// Use a const instead of rss.Kind as it maybe empty, this is odd.
Kind: constants.RSSKind,
BlockOwnerDeletion: pointer.Bool(true),
Name: rss.Name,
UID: rss.UID,
Controller: pointer.Bool(true),
},
}
}
// GenerateMainContainer generate main container of coordinators or shuffle servers.
func GenerateMainContainer(name, configDir string, podSpec *v1alpha1.RSSPodSpec,
ports []corev1.ContainerPort, env []corev1.EnvVar,
initVolumeMounts []corev1.VolumeMount) *corev1.Container {
mainContainer := &corev1.Container{
Name: name,
Image: podSpec.Image,
Command: podSpec.Command,
Args: podSpec.Args,
ImagePullPolicy: corev1.PullAlways,
Resources: podSpec.Resources,
Ports: ports,
Env: env,
VolumeMounts: []corev1.VolumeMount{
{
Name: controllerconstants.ConfigurationVolumeName,
MountPath: configDir,
},
},
}
mainContainer.VolumeMounts = append(mainContainer.VolumeMounts, initVolumeMounts...)
addVolumeMountsOfMainContainer(mainContainer, podSpec.HostPathMounts, podSpec.VolumeMounts)
return mainContainer
}
func addVolumeMountsOfMainContainer(mainContainer *corev1.Container,
hostPathMounts map[string]string, volumeMounts []corev1.VolumeMount) {
var clearPathCMDs []string
mainContainer.VolumeMounts = append(mainContainer.VolumeMounts,
GenerateHostPathVolumeMounts(hostPathMounts)...)
for _, mountPath := range hostPathMounts {
clearPathCMDs = append(clearPathCMDs, fmt.Sprintf("rm -rf %v/%v/*",
strings.TrimSuffix(mountPath, "/"), controllerconstants.RssDataDir))
}
if len(clearPathCMDs) > 0 {
mainContainer.Lifecycle = &corev1.Lifecycle{
PreStop: &corev1.Handler{
Exec: &corev1.ExecAction{
Command: []string{"/bin/sh", "-c", strings.Join(clearPathCMDs, ";")},
},
},
}
}
mainContainer.VolumeMounts = append(mainContainer.VolumeMounts, volumeMounts...)
}
// GenerateHostPathVolumeMounts generates volume mounts for hostPaths configured in rss objects.
func GenerateHostPathVolumeMounts(hostPathMounts map[string]string) []corev1.VolumeMount {
var volumeMounts []corev1.VolumeMount
for hostPath, mountPath := range hostPathMounts {
volumeMounts = append(volumeMounts, corev1.VolumeMount{
Name: GenerateHostPathVolumeName(hostPath),
MountPath: mountPath,
})
}
return volumeMounts
}
// GenerateHostPathVolumes generates all hostPath volumes. If logHostPath is not empty, we need to handle it in a
// special way by adding the subPath to the hostPath which is the same as logHostPath.
func GenerateHostPathVolumes(hostPathMounts map[string]string, logHostPath, subPath string) []corev1.Volume {
volumes := make([]corev1.Volume, 0)
for hostPath := range hostPathMounts {
if len(hostPath) == 0 {
continue
}
if hostPath == logHostPath {
volumes = append(volumes, *GenerateHostPathVolume(hostPath, subPath))
} else {
volumes = append(volumes, *GenerateHostPathVolume(hostPath, ""))
}
}
return volumes
}
// GenerateHostPathVolume convert host path to hostPath volume.
func GenerateHostPathVolume(hostPath, subPath string) *corev1.Volume {
path := hostPath
if len(subPath) > 0 {
path = fmt.Sprintf("%v/%v", strings.TrimSuffix(path, "/"), subPath)
}
hostPathType := corev1.HostPathDirectoryOrCreate
return &corev1.Volume{
Name: GenerateHostPathVolumeName(hostPath),
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: path,
Type: &hostPathType,
},
},
}
}
// GenerateHostPathVolumeName converts host path to volume name.
func GenerateHostPathVolumeName(hostPath string) string {
hostPath = strings.TrimPrefix(hostPath, "/")
hostPath = strings.TrimSuffix(hostPath, "/")
hostPath = strings.ReplaceAll(hostPath, "/", "-")
return hostPath
}
// GenerateInitContainers generates init containers for coordinators and shuffle servers.
func GenerateInitContainers(rssPodSpec *v1alpha1.RSSPodSpec) []corev1.Container {
var initContainers []corev1.Container
if rssPodSpec.SecurityContext == nil || rssPodSpec.SecurityContext.FSGroup == nil {
return initContainers
}
if len(rssPodSpec.HostPathMounts) > 0 {
image := rssPodSpec.InitContainerImage
if len(image) == 0 {
image = constants.DefaultInitContainerImage
}
commands := generateMakeDataDirCommand(rssPodSpec)
initContainers = append(initContainers, corev1.Container{
Name: "init-data-dir",
Image: image,
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{"sh", "-c", strings.Join(commands, ";")},
SecurityContext: &corev1.SecurityContext{
RunAsUser: pointer.Int64(0),
Privileged: pointer.Bool(true),
},
VolumeMounts: GenerateHostPathVolumeMounts(rssPodSpec.HostPathMounts),
Resources: rssPodSpec.Resources,
})
if len(rssPodSpec.LogHostPath) > 0 {
initContainers = append(initContainers, corev1.Container{
Name: "init-log-dir",
Image: image,
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{
"sh", "-c",
fmt.Sprintf("mkdir -p %v && chmod -R 777 %v", rssPodSpec.LogHostPath,
rssPodSpec.LogHostPath),
},
SecurityContext: &corev1.SecurityContext{
RunAsUser: pointer.Int64(0),
Privileged: pointer.Bool(true),
},
VolumeMounts: []corev1.VolumeMount{
*generateLogVolumeMount(rssPodSpec),
},
Resources: rssPodSpec.Resources,
})
}
}
return initContainers
}
func generateLogVolumeMount(rssPodSpec *v1alpha1.RSSPodSpec) *corev1.VolumeMount {
logHostPath := rssPodSpec.LogHostPath
return &corev1.VolumeMount{
Name: GenerateHostPathVolumeName(logHostPath),
MountPath: rssPodSpec.HostPathMounts[logHostPath],
}
}
func generateMakeDataDirCommand(rssPodSpec *v1alpha1.RSSPodSpec) []string {
var commands []string
var runAsUser int64
if rssPodSpec.SecurityContext != nil && rssPodSpec.SecurityContext.RunAsUser != nil {
runAsUser = *rssPodSpec.SecurityContext.RunAsUser
}
var fsGroup int64
if rssPodSpec.SecurityContext != nil && rssPodSpec.SecurityContext.FSGroup != nil {
fsGroup = *rssPodSpec.SecurityContext.FSGroup
}
for _, mountPath := range rssPodSpec.HostPathMounts {
commands = append(commands, fmt.Sprintf("chown -R %v:%v %v", runAsUser, fsGroup, mountPath))
}
return commands
}
// CreateRecorder creates an event recorder.
func CreateRecorder(kubeClient kubernetes.Interface, component string) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
Interface: kubeClient.CoreV1().Events(utils.GetCurrentNamespace()),
})
return eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: component})
}
// GetConfigMapOwner returns name of rss object as owner of configMap.
func GetConfigMapOwner(cm *corev1.ConfigMap) string {
return cm.Labels[controllerconstants.OwnerLabel]
}