| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.kubernetes.operator.service; |
| |
| import org.apache.flink.client.cli.ApplicationDeployer; |
| import org.apache.flink.client.deployment.ClusterClientFactory; |
| import org.apache.flink.client.deployment.ClusterClientServiceLoader; |
| import org.apache.flink.client.deployment.ClusterDescriptor; |
| import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; |
| import org.apache.flink.client.deployment.application.ApplicationConfiguration; |
| import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.kubernetes.operator.api.FlinkDeployment; |
| import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; |
| import org.apache.flink.kubernetes.operator.api.spec.JobSpec; |
| import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; |
| import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; |
| import org.apache.flink.kubernetes.utils.KubernetesUtils; |
| |
| import io.fabric8.kubernetes.api.model.ObjectMeta; |
| import io.fabric8.kubernetes.api.model.PodList; |
| import io.fabric8.kubernetes.client.KubernetesClient; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY; |
| |
| /** |
| * Implementation of {@link FlinkService} submitting and interacting with Native Kubernetes Flink |
| * clusters and jobs. |
| */ |
| public class NativeFlinkService extends AbstractFlinkService { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(NativeFlinkService.class); |
| |
| public NativeFlinkService(KubernetesClient kubernetesClient, FlinkConfigManager configManager) { |
| super(kubernetesClient, configManager); |
| } |
| |
| @Override |
| protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception { |
| LOG.info("Deploying application cluster"); |
| final ClusterClientServiceLoader clusterClientServiceLoader = |
| new DefaultClusterClientServiceLoader(); |
| final ApplicationDeployer deployer = |
| new ApplicationClusterDeployer(clusterClientServiceLoader); |
| |
| final ApplicationConfiguration applicationConfiguration = |
| new ApplicationConfiguration( |
| jobSpec.getArgs() != null ? jobSpec.getArgs() : new String[0], |
| jobSpec.getEntryClass()); |
| |
| deployer.run(conf, applicationConfiguration); |
| LOG.info("Application cluster successfully deployed"); |
| } |
| |
| @Override |
| public void submitSessionCluster(Configuration conf) throws Exception { |
| submitClusterInternal(removeOperatorConfigs(conf)); |
| } |
| |
| @Override |
| public void cancelJob( |
| FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration configuration) |
| throws Exception { |
| // prior to Flink 1.15, ensure removal of orphaned config maps |
| // https://issues.apache.org/jira/browse/FLINK-30004 |
| boolean deleteClusterAfterSavepoint = |
| !deployment.getSpec().getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14); |
| cancelJob(deployment, upgradeMode, configuration, deleteClusterAfterSavepoint); |
| } |
| |
| @Override |
| protected PodList getJmPodList(String namespace, String clusterId) { |
| return kubernetesClient |
| .pods() |
| .inNamespace(namespace) |
| .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId)) |
| .list(); |
| } |
| |
| protected void submitClusterInternal(Configuration conf) throws Exception { |
| LOG.info("Deploying session cluster"); |
| final ClusterClientServiceLoader clusterClientServiceLoader = |
| new DefaultClusterClientServiceLoader(); |
| final ClusterClientFactory<String> kubernetesClusterClientFactory = |
| clusterClientServiceLoader.getClusterClientFactory(conf); |
| try (final ClusterDescriptor<String> kubernetesClusterDescriptor = |
| kubernetesClusterClientFactory.createClusterDescriptor(conf)) { |
| kubernetesClusterDescriptor.deploySessionCluster( |
| kubernetesClusterClientFactory.getClusterSpecification(conf)); |
| } |
| LOG.info("Session cluster successfully deployed"); |
| } |
| |
| @Override |
| protected void deleteClusterInternal(ObjectMeta meta, boolean deleteHaConfigmaps) { |
| |
| String namespace = meta.getNamespace(); |
| String clusterId = meta.getName(); |
| |
| LOG.info( |
| "Deleting JobManager deployment {}.", |
| deleteHaConfigmaps ? "and HA metadata" : "while preserving HA metadata"); |
| kubernetesClient |
| .apps() |
| .deployments() |
| .inNamespace(namespace) |
| .withName(KubernetesUtils.getDeploymentName(clusterId)) |
| .delete(); |
| |
| if (deleteHaConfigmaps) { |
| // We need to wait for cluster shutdown otherwise HA configmaps might be recreated |
| waitForClusterShutdown( |
| namespace, |
| clusterId, |
| configManager |
| .getOperatorConfiguration() |
| .getFlinkShutdownClusterTimeout() |
| .toSeconds()); |
| kubernetesClient |
| .configMaps() |
| .inNamespace(namespace) |
| .withLabels( |
| KubernetesUtils.getConfigMapLabels( |
| clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY)) |
| .delete(); |
| } |
| } |
| } |