blob: 955185cdc2627ade2188f86cfb3a160dc1ba790e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.operator.service;
import org.apache.flink.client.cli.ApplicationDeployer;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
/**
* Implementation of {@link FlinkService} submitting and interacting with Native Kubernetes Flink
* clusters and jobs.
*/
public class NativeFlinkService extends AbstractFlinkService {
private static final Logger LOG = LoggerFactory.getLogger(NativeFlinkService.class);
public NativeFlinkService(KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
super(kubernetesClient, configManager);
}
@Override
protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception {
LOG.info("Deploying application cluster");
final ClusterClientServiceLoader clusterClientServiceLoader =
new DefaultClusterClientServiceLoader();
final ApplicationDeployer deployer =
new ApplicationClusterDeployer(clusterClientServiceLoader);
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(
jobSpec.getArgs() != null ? jobSpec.getArgs() : new String[0],
jobSpec.getEntryClass());
deployer.run(conf, applicationConfiguration);
LOG.info("Application cluster successfully deployed");
}
@Override
public void submitSessionCluster(Configuration conf) throws Exception {
submitClusterInternal(removeOperatorConfigs(conf));
}
@Override
public void cancelJob(
FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration configuration)
throws Exception {
// prior to Flink 1.15, ensure removal of orphaned config maps
// https://issues.apache.org/jira/browse/FLINK-30004
boolean deleteClusterAfterSavepoint =
!deployment.getSpec().getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14);
cancelJob(deployment, upgradeMode, configuration, deleteClusterAfterSavepoint);
}
@Override
protected PodList getJmPodList(String namespace, String clusterId) {
return kubernetesClient
.pods()
.inNamespace(namespace)
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId))
.list();
}
protected void submitClusterInternal(Configuration conf) throws Exception {
LOG.info("Deploying session cluster");
final ClusterClientServiceLoader clusterClientServiceLoader =
new DefaultClusterClientServiceLoader();
final ClusterClientFactory<String> kubernetesClusterClientFactory =
clusterClientServiceLoader.getClusterClientFactory(conf);
try (final ClusterDescriptor<String> kubernetesClusterDescriptor =
kubernetesClusterClientFactory.createClusterDescriptor(conf)) {
kubernetesClusterDescriptor.deploySessionCluster(
kubernetesClusterClientFactory.getClusterSpecification(conf));
}
LOG.info("Session cluster successfully deployed");
}
@Override
protected void deleteClusterInternal(ObjectMeta meta, boolean deleteHaConfigmaps) {
String namespace = meta.getNamespace();
String clusterId = meta.getName();
LOG.info(
"Deleting JobManager deployment {}.",
deleteHaConfigmaps ? "and HA metadata" : "while preserving HA metadata");
kubernetesClient
.apps()
.deployments()
.inNamespace(namespace)
.withName(KubernetesUtils.getDeploymentName(clusterId))
.delete();
if (deleteHaConfigmaps) {
// We need to wait for cluster shutdown otherwise HA configmaps might be recreated
waitForClusterShutdown(
namespace,
clusterId,
configManager
.getOperatorConfiguration()
.getFlinkShutdownClusterTimeout()
.toSeconds());
kubernetesClient
.configMaps()
.inNamespace(namespace)
.withLabels(
KubernetesUtils.getConfigMapLabels(
clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
.delete();
}
}
}