blob: a0370eaf03040e1762042dad6ab65f5d8bbae7bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controller
import (
"context"
"fmt"
v1alpha1 "github.com/apache/submarine/submarine-cloud-v2/pkg/apis/submarine/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)
func newSubmarineServerService(submarine *v1alpha1.Submarine) *corev1.Service {
service, err := ParseServiceYaml(serverYamlPath)
if err != nil {
klog.Info("[Error] ParseServiceYaml", err)
}
service.ObjectMeta.OwnerReferences = []metav1.OwnerReference{
*metav1.NewControllerRef(submarine, v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
}
return service
}
func newSubmarineServerDeployment(submarine *v1alpha1.Submarine) *appsv1.Deployment {
serverImage := submarine.Spec.Server.Image
serverReplicas := *submarine.Spec.Server.Replicas
ownerReference := *metav1.NewControllerRef(submarine, v1alpha1.SchemeGroupVersion.WithKind("Submarine"))
operatorEnv := []corev1.EnvVar{
{
Name: "SUBMARINE_SERVER_DNS_NAME",
Value: serverName + "." + submarine.Namespace,
},
{
Name: "ENV_NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
{
Name: "SUBMARINE_APIVERSION",
Value: ownerReference.APIVersion,
},
{
Name: "SUBMARINE_KIND",
Value: ownerReference.Kind,
},
{
Name: "SUBMARINE_NAME",
Value: ownerReference.Name,
},
{
Name: "SUBMARINE_UID",
Value: string(ownerReference.UID),
},
}
deployment, err := ParseDeploymentYaml(serverYamlPath)
if err != nil {
klog.Info("[Error] ParseDeploymentYaml", err)
}
deployment.ObjectMeta.OwnerReferences = []metav1.OwnerReference{
ownerReference,
}
if serverImage != "" {
deployment.Spec.Template.Spec.Containers[0].Image = serverImage
}
deployment.Spec.Replicas = &serverReplicas
deployment.Spec.Template.Spec.Containers[0].Env = append(deployment.Spec.Template.Spec.Containers[0].Env, operatorEnv...)
return deployment
}
// createSubmarineServer is a function to create submarine-server.
// Reference: https://github.com/apache/submarine/blob/master/helm-charts/submarine/templates/submarine-server.yaml
func (c *Controller) createSubmarineServer(submarine *v1alpha1.Submarine) error {
klog.Info("[createSubmarineServer]")
// Step1: Create Service
service, err := c.serviceLister.Services(submarine.Namespace).Get(serverName)
// If the resource doesn't exist, we'll create it
if errors.IsNotFound(err) {
service, err = c.kubeclientset.CoreV1().Services(submarine.Namespace).Create(context.TODO(), newSubmarineServerService(submarine), metav1.CreateOptions{})
klog.Info(" Create Service: ", service.Name)
}
// If an error occurs during Get/Create, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil {
return err
}
if !metav1.IsControlledBy(service, submarine) {
msg := fmt.Sprintf(MessageResourceExists, service.Name)
c.recorder.Event(submarine, corev1.EventTypeWarning, ErrResourceExists, msg)
return fmt.Errorf(msg)
}
// Step2: Create Deployment
deployment, err := c.deploymentLister.Deployments(submarine.Namespace).Get(serverName)
// If the resource doesn't exist, we'll create it
if errors.IsNotFound(err) {
deployment, err = c.kubeclientset.AppsV1().Deployments(submarine.Namespace).Create(context.TODO(), newSubmarineServerDeployment(submarine), metav1.CreateOptions{})
klog.Info(" Create Deployment: ", deployment.Name)
}
// If an error occurs during Get/Create, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil {
return err
}
if !metav1.IsControlledBy(deployment, submarine) {
msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
c.recorder.Event(submarine, corev1.EventTypeWarning, ErrResourceExists, msg)
return fmt.Errorf(msg)
}
// Update the replicas of the server deployment if it is not equal to spec
if submarine.Spec.Server.Replicas != nil && *submarine.Spec.Server.Replicas != *deployment.Spec.Replicas {
klog.V(4).Infof("Submarine %s server spec replicas: %d, actual replicas: %d", submarine.Name, *submarine.Spec.Server.Replicas, *deployment.Spec.Replicas)
_, err = c.kubeclientset.AppsV1().Deployments(submarine.Namespace).Update(context.TODO(), newSubmarineServerDeployment(submarine), metav1.UpdateOptions{})
}
if err != nil {
return err
}
return nil
}