| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package controllers |
| |
| import ( |
| "context" |
| dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1" |
| |
| corev1 "k8s.io/api/core/v1" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "sigs.k8s.io/controller-runtime/pkg/client" |
| "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" |
| ) |
| |
| func (r *DSWorkerReconciler) podMemberSet(ctx context.Context, cluster *dsv1alpha1.DSWorker) (MemberSet, error) { |
| members := MemberSet{} |
| pods := &corev1.PodList{} |
| |
| if err := r.Client.List(ctx, pods, client.InNamespace(cluster.Namespace), |
| client.MatchingLabels(LabelsForCluster(dsv1alpha1.DsWorkerLabel))); err != nil { |
| return members, err |
| } |
| |
| if len(pods.Items) > 0 { |
| for _, pod := range pods.Items { |
| if pod.ObjectMeta.DeletionTimestamp.IsZero() { |
| m := &Member{ |
| Name: pod.Name, |
| Namespace: pod.Namespace, |
| Created: true, |
| Version: pod.Labels[dsv1alpha1.DsVersionLabel], |
| Phase: string(pod.Status.Phase), |
| RunningAndReady: IsRunningAndReady(&pod), |
| } |
| members.Add(m) |
| } |
| } |
| } |
| |
| return members, nil |
| } |
| |
| func newDSWorkerPod(cr *dsv1alpha1.DSWorker) *corev1.Pod { |
| var podName = cr.Name + "-pod" + dsv1alpha1.RandStr(6) |
| return &corev1.Pod{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: podName, |
| Namespace: cr.Namespace, |
| Labels: map[string]string{dsv1alpha1.DsAppName: dsv1alpha1.DsWorkerLabel, |
| dsv1alpha1.DsVersionLabel: cr.Spec.Version, |
| dsv1alpha1.DsServiceLabel: dsv1alpha1.DsServiceLabelValue, |
| }, |
| }, |
| Spec: corev1.PodSpec{ |
| Hostname: podName, |
| Subdomain: dsv1alpha1.DsServiceLabelValue, |
| ServiceAccountName: dsv1alpha1.DsServiceAccount, |
| Containers: []corev1.Container{ |
| { |
| Name: cr.Name, |
| Image: ImageName(cr.Spec.Repository, cr.Spec.Version), |
| ImagePullPolicy: corev1.PullIfNotPresent, |
| Env: []corev1.EnvVar{{ |
| Name: dsv1alpha1.EnvZookeeper, |
| Value: cr.Spec.ZookeeperConnect, |
| }, { |
| Name: dsv1alpha1.DataSourceDriveName, |
| Value: cr.Spec.Datasource.DriveName, |
| }, |
| { |
| Name: dsv1alpha1.DataSourceUrl, |
| Value: cr.Spec.Datasource.Url, |
| }, |
| { |
| Name: dsv1alpha1.DataSourceUserName, |
| Value: cr.Spec.Datasource.UserName, |
| }, |
| { |
| Name: dsv1alpha1.DataSourcePassWord, |
| Value: cr.Spec.Datasource.Password, |
| }, |
| }, |
| Command: []string{ |
| "/bin/sh", "-c", |
| }, |
| Args: []string{"sed -i 's/alert-listen-host: localhost/alert-listen-host: $(DS_ALERT_SERVICE_SERVICE_HOST)/g' conf/application.yaml ;" + |
| " sed -i 's/50052/$(DS_ALERT_SERVICE_SERVICE_PORT)/g' conf/application.yaml ; " + |
| "./bin/start.sh"}, |
| }, |
| }, |
| }, |
| } |
| } |
| |
| func (r *DSWorkerReconciler) newDSWorkerPod(ctx context.Context, cluster *dsv1alpha1.DSWorker) (*corev1.Pod, error) { |
| // Create pod |
| pod := newDSWorkerPod(cluster) |
| if err := controllerutil.SetControllerReference(cluster, pod, r.Scheme); err != nil { |
| return nil, err |
| } |
| AddLogVolumeToPod(pod, cluster.Spec.LogPvcName) |
| AddLibVolumeToPod(pod, cluster.Spec.LibPvcName) |
| applyPodPolicy(pod, cluster.Spec.Pod) |
| return pod, nil |
| } |