blob: 1a9f45cf671acc42061bc0d8d84c55c50f57d740 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package controllers
import (
"context"
dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
v1 "k8s.io/api/rbac/v1"
"k8s.io/api/autoscaling/v2beta2"
_ "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
func (r *DSMasterReconciler) podMemberSet(ctx context.Context, cluster *dsv1alpha1.DSMaster) (MemberSet, error) {
members := MemberSet{}
pods := &corev1.PodList{}
if err := r.Client.List(ctx, pods, client.InNamespace(cluster.Namespace),
client.MatchingLabels(LabelsForCluster(dsv1alpha1.DsMasterLabel))); err != nil {
return members, err
}
if len(pods.Items) > 0 {
for _, pod := range pods.Items {
if pod.ObjectMeta.DeletionTimestamp.IsZero() {
m := &Member{
Name: pod.Name,
Namespace: pod.Namespace,
Created: true,
Version: pod.Labels[dsv1alpha1.DsVersionLabel],
Phase: string(pod.Status.Phase),
RunningAndReady: IsRunningAndReady(&pod),
}
members.Add(m)
}
}
}
return members, nil
}
func newDSMasterPod(cr *dsv1alpha1.DSMaster) *corev1.Pod {
var isSetHostnameAsFQDN bool
isSetHostnameAsFQDN = true
var podName = cr.Name + "-pod" + dsv1alpha1.RandStr(6)
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: cr.Namespace,
Labels: map[string]string{dsv1alpha1.DsAppName: dsv1alpha1.DsMasterLabel,
dsv1alpha1.DsVersionLabel: cr.Spec.Version,
dsv1alpha1.DsServiceLabel: dsv1alpha1.DsServiceLabelValue},
},
Spec: corev1.PodSpec{
Hostname: podName,
Subdomain: dsv1alpha1.DsServiceLabelValue,
SetHostnameAsFQDN: &isSetHostnameAsFQDN,
ServiceAccountName: dsv1alpha1.DsServiceAccount,
Containers: []corev1.Container{
{
Name: cr.Name,
Image: ImageName(cr.Spec.Repository, cr.Spec.Version),
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{
{
Name: dsv1alpha1.EnvZookeeper,
Value: cr.Spec.ZookeeperConnect,
}, {
Name: dsv1alpha1.DataSourceDriveName,
Value: cr.Spec.Datasource.DriveName,
},
{
Name: dsv1alpha1.DataSourceUrl,
Value: cr.Spec.Datasource.Url,
},
{
Name: dsv1alpha1.DataSourceUserName,
Value: cr.Spec.Datasource.UserName,
},
{
Name: dsv1alpha1.DataSourcePassWord,
Value: cr.Spec.Datasource.Password,
},
},
},
},
},
}
}
func (r *DSMasterReconciler) ensureDSMasterDeleted(ctx context.Context, DSMaster *dsv1alpha1.DSMaster) error {
if err := r.Client.Delete(ctx, DSMaster, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
return err
}
return nil
}
func (r *DSMasterReconciler) newDSMasterPod(cluster *dsv1alpha1.DSMaster) (*corev1.Pod, error) {
// Create pod
pod := newDSMasterPod(cluster)
if err := controllerutil.SetControllerReference(cluster, pod, r.Scheme); err != nil {
return nil, err
}
AddLogVolumeToPod(pod, cluster.Spec.LogPvcName)
applyPodPolicy(pod, cluster.Spec.Pod)
return pod, nil
}
func createMasterService(cluster *dsv1alpha1.DSMaster) *corev1.Service {
labels_ := LabelsForService()
service := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: dsv1alpha1.DsHeadLessServiceLabel,
Namespace: cluster.Namespace,
Labels: map[string]string{dsv1alpha1.DsAppName: dsv1alpha1.DsHeadLessServiceLabel},
},
Spec: corev1.ServiceSpec{
Selector: labels_,
ClusterIP: corev1.ClusterIPNone,
PublishNotReadyAddresses: true,
},
}
return &service
}
func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) *v2beta2.HorizontalPodAutoscaler {
hpa := v2beta2.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: dsv1alpha1.DsWorkerHpa,
Namespace: cluster.Namespace,
ResourceVersion: dsv1alpha1.DSVersion,
},
Spec: v2beta2.HorizontalPodAutoscalerSpec{
ScaleTargetRef: v2beta2.CrossVersionObjectReference{
Kind: dsv1alpha1.DsWorkerKind,
Name: dsv1alpha1.DsWorkerLabel,
APIVersion: dsv1alpha1.APIVersion,
},
MinReplicas: &cluster.Spec.HpaPolicy.MinReplicas,
MaxReplicas: cluster.Spec.HpaPolicy.MaxReplicas,
},
}
if cluster.Spec.HpaPolicy.CPUAverageUtilization > 0 {
hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2beta2.MetricSpec{
Type: v2beta2.ResourceMetricSourceType,
Resource: &v2beta2.ResourceMetricSource{
Name: corev1.ResourceCPU,
Target: v2beta2.MetricTarget{
Type: v2beta2.UtilizationMetricType,
AverageUtilization: &cluster.Spec.HpaPolicy.CPUAverageUtilization,
},
},
})
}
if cluster.Spec.HpaPolicy.MEMAverageUtilization > 0 {
hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2beta2.MetricSpec{
Type: v2beta2.ResourceMetricSourceType,
Resource: &v2beta2.ResourceMetricSource{
Name: corev1.ResourceMemory,
Target: v2beta2.MetricTarget{
Type: v2beta2.UtilizationMetricType,
AverageUtilization: &cluster.Spec.HpaPolicy.MEMAverageUtilization,
},
},
})
}
return &hpa
}
func (r *DSMasterReconciler) deleteHPA(ctx context.Context, hpa *v2beta2.HorizontalPodAutoscaler) error {
if err := r.Client.Delete(ctx, hpa, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
return err
}
return nil
}
func (r *DSMasterReconciler) createRole(cluster *dsv1alpha1.DSMaster) *v1.Role {
role := v1.Role{
ObjectMeta: metav1.ObjectMeta{
Name: dsv1alpha1.DsRole,
Namespace: cluster.Namespace,
Labels: map[string]string{dsv1alpha1.DsAppName: dsv1alpha1.DsRole},
},
Rules: []v1.PolicyRule{
{
Verbs: []string{"get", "watch"},
Resources: []string{"configmaps"},
APIGroups: []string{""},
}},
}
return &role
}
func (r *DSMasterReconciler) createRoleBinding(cluster *dsv1alpha1.DSMaster) *v1.RoleBinding {
roleBinding := v1.RoleBinding{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: dsv1alpha1.DsRole,
Namespace: cluster.Namespace,
Labels: map[string]string{dsv1alpha1.DsAppName: dsv1alpha1.DsRoleBinding},
},
Subjects: []v1.Subject{{
Kind: "ServiceAccount",
Name: dsv1alpha1.DsServiceAccount,
Namespace: cluster.Namespace,
}},
RoleRef: v1.RoleRef{
APIGroup: "rbac.authorization.k8s.io",
Kind: "Role",
Name: dsv1alpha1.DsRole,
},
}
return &roleBinding
}