blob: b51710b1a6a5647257b5e52aed1daa745bd19cdf [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package controllers
import (
"context"
dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
func (r *DSWorkerReconciler) podMemberSet(ctx context.Context, cluster *dsv1alpha1.DSWorker) (MemberSet, error) {
members := MemberSet{}
pods := &corev1.PodList{}
if err := r.Client.List(ctx, pods, client.InNamespace(cluster.Namespace),
client.MatchingLabels(LabelsForCluster(dsv1alpha1.DsWorkerLabel))); err != nil {
return members, err
}
if len(pods.Items) > 0 {
for _, pod := range pods.Items {
if pod.ObjectMeta.DeletionTimestamp.IsZero() {
m := &Member{
Name: pod.Name,
Namespace: pod.Namespace,
Created: true,
Version: pod.Labels[dsv1alpha1.DsVersionLabel],
Phase: string(pod.Status.Phase),
RunningAndReady: IsRunningAndReady(&pod),
}
members.Add(m)
}
}
}
return members, nil
}
func newDSWorkerPod(cr *dsv1alpha1.DSWorker) *corev1.Pod {
var podName = cr.Name + "-pod" + dsv1alpha1.RandStr(6)
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: cr.Namespace,
Labels: map[string]string{dsv1alpha1.DsAppName: dsv1alpha1.DsWorkerLabel,
dsv1alpha1.DsVersionLabel: cr.Spec.Version,
dsv1alpha1.DsServiceLabel: dsv1alpha1.DsServiceLabelValue,
},
},
Spec: corev1.PodSpec{
Hostname: podName,
Subdomain: dsv1alpha1.DsServiceLabelValue,
ServiceAccountName: dsv1alpha1.DsServiceAccount,
Containers: []corev1.Container{
{
Name: cr.Name,
Image: ImageName(cr.Spec.Repository, cr.Spec.Version),
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{{
Name: dsv1alpha1.EnvZookeeper,
Value: cr.Spec.ZookeeperConnect,
}, {
Name: dsv1alpha1.DataSourceDriveName,
Value: cr.Spec.Datasource.DriveName,
},
{
Name: dsv1alpha1.DataSourceUrl,
Value: cr.Spec.Datasource.Url,
},
{
Name: dsv1alpha1.DataSourceUserName,
Value: cr.Spec.Datasource.UserName,
},
{
Name: dsv1alpha1.DataSourcePassWord,
Value: cr.Spec.Datasource.Password,
},
},
Command: []string{
"/bin/sh", "-c",
},
Args: []string{"sed -i 's/alert-listen-host: localhost/alert-listen-host: $(DS_ALERT_SERVICE_SERVICE_HOST)/g' conf/application.yaml ;" +
" sed -i 's/50052/$(DS_ALERT_SERVICE_SERVICE_PORT)/g' conf/application.yaml ; " +
"./bin/start.sh"},
},
},
},
}
}
func (r *DSWorkerReconciler) newDSWorkerPod(ctx context.Context, cluster *dsv1alpha1.DSWorker) (*corev1.Pod, error) {
// Create pod
pod := newDSWorkerPod(cluster)
if err := controllerutil.SetControllerReference(cluster, pod, r.Scheme); err != nil {
return nil, err
}
AddLogVolumeToPod(pod, cluster.Spec.LogPvcName)
AddLibVolumeToPod(pod, cluster.Spec.LibPvcName)
applyPodPolicy(pod, cluster.Spec.Pod)
return pod, nil
}