blob: aa03c702e898a14e7ae59200719941b8205bc9a7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.runtime.clusterframework;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.Constants;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.utils.KubernetesConnectionManager;
import org.apache.flink.kubernetes.utils.KubernetesRMUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.TaskManagerResource;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.DynamicAssigningSlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerStateTerminated;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import org.apache.commons.lang3.StringUtils;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* The Kubernetes implementation of the resource manager. Used when the system is started
* via the resource framework Kubernetes.
*/
public class KubernetesSessionResourceManager extends
ResourceManager<KubernetesWorkerNode> {
/**
* Kubernetes pod map. Package private for unit test purposes.
*/
final ConcurrentMap<ResourceID, KubernetesWorkerNode> workerNodeMap;
private final Configuration flinkConfig;
private ConfigMap tmConfigMap;
/**
* Connection manager to communicate with Kubernetes.
*/
private KubernetesConnectionManager kubernetesConnectionManager;
/** The number of containers requested. **/
private final int workerNum;
/** The pending pod requests, but not yet granted. */
private final Set<ResourceID> pendingWorkerNodes;
private final Time taskManagerRegisterTimeout;
private final TaskManagerResource taskManagerResource;
private Watch watcher;
private final String clusterId;
private final Map<String, String> taskManagerPodLabels;
private final String taskManagerPodNamePrefix;
private final String taskManagerConfigMapName;
private final AtomicLong maxPodId = new AtomicLong(0);
private final String confDir;
private volatile boolean isStopped;
private final int workerNodeMaxFailedAttempts;
private final AtomicInteger workerNodeFailedAttempts = new AtomicInteger(0);
private OwnerReference ownerReference;
private BiConsumer<Watcher.Action, Pod> podEventHandler;
private Consumer<Exception> watcherCloseHandler;
public KubernetesSessionResourceManager(
RpcService rpcService,
String resourceManagerEndpointId,
ResourceID resourceId,
Configuration flinkConfig,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
SlotManager slotManager,
MetricRegistry metricRegistry,
JobLeaderIdService jobLeaderIdService,
ClusterInformation clusterInformation,
FatalErrorHandler fatalErrorHandler) {
super(
rpcService,
resourceManagerEndpointId,
resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
slotManager,
metricRegistry,
jobLeaderIdService,
clusterInformation,
fatalErrorHandler);
this.flinkConfig = flinkConfig;
this.workerNodeMap = new ConcurrentHashMap<>();
this.pendingWorkerNodes = new HashSet<>();
this.confDir = flinkConfig.getString(KubernetesConfigOptions.CONF_DIR);
taskManagerRegisterTimeout = Time.seconds(flinkConfig
.getLong(KubernetesConfigOptions.TASK_MANAGER_REGISTER_TIMEOUT));
workerNodeMaxFailedAttempts = flinkConfig.getInteger(
KubernetesConfigOptions.WORKER_NODE_MAX_FAILED_ATTEMPTS);
// build the task manager's total resource according to user's resource
taskManagerResource = TaskManagerResource.fromConfiguration(flinkConfig,
KubernetesRMUtils.createTaskManagerResourceProfile(flinkConfig), 1);
log.info("Task manager resource: " + taskManagerResource);
if (slotManager instanceof DynamicAssigningSlotManager) {
((DynamicAssigningSlotManager) slotManager).setTotalResourceOfTaskExecutor(
taskManagerResource.getTaskResourceProfile());
log.info("The resource for user in a task executor is {}.", taskManagerResource);
}
clusterId = flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID);
taskManagerPodLabels = new HashMap<>();
taskManagerPodLabels.put(Constants.LABEL_APP_KEY, clusterId);
taskManagerPodLabels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_TASK_MANAGER);
taskManagerPodNamePrefix =
clusterId + Constants.TASK_MANAGER_LABEL_SUFFIX + Constants.NAME_SEPARATOR;
taskManagerConfigMapName =
clusterId + Constants.TASK_MANAGER_CONFIG_MAP_SUFFIX;
workerNum =
flinkConfig.getInteger(KubernetesConfigOptions.TASK_MANAGER_COUNT);
log.info("Initialize KubernetesSessionResourceManager: clusterId: {}, "
+ "workerNum: {}", clusterId, workerNum);
}
@VisibleForTesting
protected void getWorkerNodesFromPreviousAttempts() throws ResourceManagerException {
PodList podList = kubernetesConnectionManager.getPods(taskManagerPodLabels);
if (podList != null && podList.getItems().size() > 0) {
// add worker nodes
for (Pod pod : podList.getItems()) {
addWorkerNode(pod, false);
}
if (!workerNodeMap.isEmpty()) {
long maxId = workerNodeMap.values().stream()
.mapToLong(KubernetesWorkerNode::getPodId).max().getAsLong();
maxPodId.set(maxId);
}
log.info(
"Recovered {} pods from previous attempts, max pod id is {}.",
workerNodeMap.size(), maxPodId.get());
}
}
private synchronized KubernetesWorkerNode addWorkerNode(Pod pod, boolean checkPending)
throws ResourceManagerException {
String podName = pod.getMetadata().getName();
ResourceID resourceId = new ResourceID(podName);
boolean pendingRemoved = pendingWorkerNodes.remove(resourceId);
if (!pendingRemoved && checkPending) {
log.warn("Skip adding worker node {} since it's no longer pending!", resourceId);
kubernetesConnectionManager.removePod(pod);
return null;
}
if (workerNodeMap.containsKey(resourceId)) {
log.warn("Skip adding worker node {} since it's already exist!", resourceId);
return workerNodeMap.get(resourceId);
}
if (workerNodeMap.size() >= workerNum) {
log.error("Skip adding worker node {} since the number of worker nodes ({}) is equal with "
+ "or beyond required ({})", workerNodeMap.size(), workerNum);
kubernetesConnectionManager.removePod(pod);
return null;
}
if (podName.startsWith(taskManagerPodNamePrefix)) {
String podId = podName
.substring(podName.lastIndexOf(Constants.NAME_SEPARATOR) + 1);
if (StringUtils.isNumeric(podId)) {
KubernetesWorkerNode workerNode =
new KubernetesWorkerNode(pod, podName,
Long.parseLong(podId));
workerNodeMap.put(workerNode.getResourceID(), workerNode);
scheduleRunAsync(() -> checkTMRegistered(resourceId), taskManagerRegisterTimeout);
log.info("Add worker node : {}, worker nodes: {}, pending worker nodes: {} - {}",
workerNode.getResourceID(), workerNodeMap.size(), pendingWorkerNodes.size(), pendingWorkerNodes);
return workerNode;
} else {
log.warn("Skip adding invalid pod whose name is {} "
+ "and the last part is not a number.", podName);
kubernetesConnectionManager.removePod(pod);
}
} else {
log.warn("Skip adding invalid pod whose name is {} and prefix is not {}.",
podName, taskManagerPodNamePrefix);
kubernetesConnectionManager.removePod(pod);
}
return null;
}
private synchronized boolean removeWorkerNode(ResourceID resourceID, String diagnostics, boolean increaseFailedAttempt) {
if (!workerNodeMap.containsKey(resourceID)) {
log.info("Skip removing non-exist worker node {}.", resourceID);
return false;
}
log.info("Try to remove worker node: {}, diagnostics: {}", resourceID, diagnostics);
// If a worker terminated exceptionally, start a new one;
KubernetesWorkerNode node = workerNodeMap.remove(resourceID);
if (node != null) {
try {
if (increaseFailedAttempt) {
increaseWorkerNodeFailedAttempts();
}
closeTaskManagerConnection(resourceID, new Exception(diagnostics));
kubernetesConnectionManager.removePod(node.getPod());
} catch (Exception e) {
String fatalMsg = "Failed to remove worker node. Exiting, bye...";
onFatalError(new ResourceManagerException(fatalMsg, e));
}
checkWorkerNodeFailedAttempts();
requestWorkerNodes();
log.info("Removed worker node: {}, left worker nodes: {}", resourceID, workerNodeMap.size());
return true;
}
return false;
}
protected KubernetesConnectionManager createKubernetesConnectionManager() {
return new KubernetesConnectionManager(flinkConfig);
}
@Override
protected void initialize() throws ResourceManagerException {
isStopped = false;
try {
kubernetesConnectionManager = createKubernetesConnectionManager();
} catch (Exception e) {
throw new ResourceManagerException("Could not start resource manager client.", e);
}
try {
getWorkerNodesFromPreviousAttempts();
} catch (Exception e) {
throw new ResourceManagerException("Could not get pods from previous attempts.", e);
}
try {
setupOwnerReference();
} catch (Exception e) {
throw new ResourceManagerException("Could not setup owner reference.", e);
}
try {
setupTaskManagerConfigMap();
} catch (Exception e) {
throw new ResourceManagerException("Could not upload TaskManager config map.", e);
}
try {
podEventHandler = (action, pod) -> runAsync(() -> handlePodMessage(action, pod));
watcherCloseHandler = (exception) -> {
while (true) {
try {
watcher = createAndStartWatcher();
break;
} catch (Exception e) {
log.error("Can't create and start watcher, should try it again.", e);
}
}
};
watcher = createAndStartWatcher();
} catch (Exception e) {
throw new ResourceManagerException(
"Could not create and start watcher.", e);
}
try {
requestWorkerNodes();
} catch (Exception e) {
throw new ResourceManagerException(
"Could not create and start worker node.", e);
}
}
protected void setupOwnerReference() throws ResourceManagerException {
Service service = kubernetesConnectionManager.getService(clusterId + Constants.SERVICE_NAME_SUFFIX);
if (service != null) {
ownerReference = KubernetesRMUtils.createOwnerReference(service);
} else {
throw new RuntimeException("Failed to get service " + clusterId + Constants.SERVICE_NAME_SUFFIX);
}
}
protected void setupTaskManagerConfigMap() throws ResourceManagerException {
tmConfigMap = KubernetesRMUtils.createTaskManagerConfigMap(flinkConfig, confDir,
ownerReference, taskManagerConfigMapName);
kubernetesConnectionManager.createOrReplaceConfigMap(tmConfigMap);
}
protected Watch createAndStartWatcher() throws ResourceManagerException {
return kubernetesConnectionManager.createAndStartPodsWatcher(
taskManagerPodLabels, podEventHandler, watcherCloseHandler);
}
protected void handlePodMessage(Watcher.Action action, Pod pod) {
ResourceID resourceId = new ResourceID(pod.getMetadata().getName());
log.info("Received {} event for worker node {}, details: {}", action, resourceId, pod.getStatus());
switch (action) {
case ADDED:
if (removePodIfTerminated(pod)) {
break;
}
if (workerNodeMap.containsKey(resourceId)) {
log.info("Skip adding worker node {} since it's already exist!", resourceId);
} else {
try {
addWorkerNode(pod, true);
} catch (Exception e) {
String fatalMsg = "Failed to add work node. Exiting, bye...";
onFatalError(new ResourceManagerException(fatalMsg));
}
}
break;
case MODIFIED:
removePodIfTerminated(pod);
break;
case ERROR:
removePodIfTerminated(pod);
break;
case DELETED:
removeWorkerNode(
new ResourceID(pod.getMetadata().getName()),
"Pod is deleted.", false);
break;
default:
log.debug("Skip handling {} event for pod {}", action,
pod.getMetadata().getName());
break;
}
}
private boolean removePodIfTerminated(Pod pod) {
if (pod.getStatus() != null && !pod.getStatus().getContainerStatuses().isEmpty()) {
List<ContainerStateTerminated> podTerminatedStates =
pod.getStatus().getContainerStatuses().stream()
.filter(e -> e.getState() != null && e.getState().getTerminated() != null)
.map(e -> e.getState().getTerminated()).collect(
Collectors.toList());
if (!podTerminatedStates.isEmpty()) {
//increase failed attempts if terminated exceptionally
removeWorkerNode(new ResourceID(pod.getMetadata().getName()),
"Pod terminated : " + podTerminatedStates, true);
return true;
}
}
return false;
}
private void increaseWorkerNodeFailedAttempts() {
workerNodeFailedAttempts.incrementAndGet();
log.info("Worker node failed attempts: {}, max failed attempts: {}",
workerNodeFailedAttempts.get(),
workerNodeMaxFailedAttempts);
}
private void checkWorkerNodeFailedAttempts() {
if (workerNodeFailedAttempts.get()
>= workerNodeMaxFailedAttempts) {
isStopped = true;
String fatalMsg = "Worker node failures have reached the maximum failed attempts ("
+ workerNodeMaxFailedAttempts + "). Exiting, bye...";
onFatalError(new ResourceManagerException(fatalMsg));
}
}
protected long generateNewPodId() {
return maxPodId.addAndGet(1);
}
protected ResourceID requestNewWorkerNode() throws ResourceManagerException {
String taskManagerPodName = taskManagerPodNamePrefix + generateNewPodId();
Container container = KubernetesRMUtils.createTaskManagerContainer(
flinkConfig, taskManagerResource, confDir, taskManagerPodName, null, null, null);
log.info("Task manager start command: " + container.getArgs());
Pod taskManagerPod = KubernetesRMUtils
.createTaskManagerPod(taskManagerPodLabels, taskManagerPodName,
taskManagerConfigMapName, ownerReference, container, tmConfigMap);
kubernetesConnectionManager.createPod(taskManagerPod);
return new ResourceID(taskManagerPodName);
}
@Override
public CompletableFuture<Void> postStop() {
// shut down all components
Throwable firstException = null;
if (kubernetesConnectionManager != null) {
try {
kubernetesConnectionManager.close();
} catch (Throwable t) {
firstException = ExceptionUtils
.firstOrSuppressed(t, firstException);
}
}
if (watcher != null) {
try {
watcher.close();
} catch (Throwable t) {
firstException = ExceptionUtils.firstOrSuppressed(t, firstException);
}
}
final CompletableFuture<Void> terminationFuture = super.postStop();
if (firstException != null) {
return FutureUtils.completedExceptionally(new FlinkException("Error while shutting down YARN resource manager", firstException));
} else {
return terminationFuture;
}
}
@Override
protected synchronized void internalDeregisterApplication(
ApplicationStatus finalStatus,
@Nullable String diagnostics) throws ResourceManagerException {
log.info("Unregister application from the Kubernetes Resource Manager, "
+ "finalStatus: {}, diagnostics: {}", finalStatus, diagnostics);
isStopped = true;
// remove all TM pods
kubernetesConnectionManager.removePods(taskManagerPodLabels);
}
@Override
public void startNewWorker(ResourceProfile resourceProfile) {
requestWorkerNodes();
}
@Override
public boolean stopWorker(KubernetesWorkerNode workerNode) {
if (workerNode != null) {
return removeWorkerNode(workerNode.getResourceID(), "Stop worker", false);
}
return false;
}
@Override
protected KubernetesWorkerNode workerStarted(ResourceID resourceID) {
return workerNodeMap.get(resourceID);
}
@Override
public void cancelNewWorker(ResourceProfile resourceProfile) {
}
@Override
protected int getNumberAllocatedWorkers() {
return workerNodeMap.size();
}
protected ConcurrentMap<ResourceID, KubernetesWorkerNode> getWorkerNodes() {
return workerNodeMap;
}
protected Set<ResourceID> getPendingWorkerNodes() {
return pendingWorkerNodes;
}
protected synchronized void requestWorkerNodes() {
if (isStopped) {
return;
}
int requiredWorkerNum =
workerNum - workerNodeMap.size() - pendingWorkerNodes.size();
if (requiredWorkerNum < 1) {
log.info(
"Allocated and pending containers have reached the limit {}, will not allocate more.",
workerNum);
return;
}
try {
for (int i = 0; i < requiredWorkerNum; ++i) {
ResourceID newResourceId = requestNewWorkerNode();
pendingWorkerNodes.add(newResourceId);
log.info("Add pending worker node: {}", newResourceId);
}
} catch (Exception e) {
String fatalMsg = "Failed to request new worker node. Exiting, bye...";
onFatalError(new ResourceManagerException(fatalMsg, e));
}
log.info("Number pending requests {}. Requesting new container with resources {}. ",
pendingWorkerNodes.size(), taskManagerResource);
}
protected synchronized void checkTMRegistered(ResourceID resourceId) {
KubernetesWorkerNode node = workerNodeMap.get(resourceId);
if (node != null && !taskExecutorRegistered(resourceId)) {
//increase failed attempts if terminated exceptionally
increaseWorkerNodeFailedAttempts();
log.info("Task manager {} did not register in {}, will stop it and request a new one.", resourceId, taskManagerRegisterTimeout);
stopWorker(node);
}
}
@VisibleForTesting
protected void setOwnerReference(OwnerReference ownerReference) {
this.ownerReference = ownerReference;
}
@VisibleForTesting
protected boolean isStopped() {
return isStopped;
}
}