blob: d4e8e4f899ab474d26f59dce62e8080d981af6ae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.runtime.clusterframework;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.Constants;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.utils.KubernetesConnectionManager;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.TestingRpcService;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.Watcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
/**
* General tests for the Flink Kubernetes resource manager component.
*/
public class KubernetesResourceManagerTest extends KubernetesRMTestBase {
protected SlotManager spySlotManager;
@Before
public void setup() {
super.setup();
spySlotManager = Mockito.spy(new SlotManager(Mockito.mock(ScheduledExecutor.class), Time.seconds(1), Time.seconds(1), Time.seconds(1)));
}
@SuppressWarnings("unchecked")
protected KubernetesResourceManager createKubernetesResourceManager(Configuration conf) {
ResourceManagerConfiguration rmConfig =
new ResourceManagerConfiguration(Time.seconds(1), Time.seconds(10));
final HighAvailabilityServices highAvailabilityServices = Mockito.mock(HighAvailabilityServices.class);
Mockito.when(highAvailabilityServices.getResourceManagerLeaderElectionService()).thenReturn(Mockito.mock(LeaderElectionService.class));
final HeartbeatServices heartbeatServices = Mockito.mock(HeartbeatServices.class);
Mockito.when(heartbeatServices.createHeartbeatManagerSender(
Matchers.any(ResourceID.class),
Matchers.any(HeartbeatListener.class),
Matchers.any(ScheduledExecutor.class),
Matchers.any(Logger.class))).thenReturn(Mockito.mock(HeartbeatManager.class));
return new KubernetesResourceManager(
new TestingRpcService(), "ResourceManager", ResourceID.generate(), conf,
rmConfig,
highAvailabilityServices, heartbeatServices,
spySlotManager,
Mockito.mock(MetricRegistry.class),
Mockito.mock(JobLeaderIdService.class),
new ClusterInformation("localhost", 1234),
Mockito.mock(FatalErrorHandler.class));
}
protected String createPodName(int podId) {
return APP_ID + Constants.TASK_MANAGER_LABEL_SUFFIX + Constants.NAME_SEPARATOR + podId;
}
protected Pod createPod(int priority, int podId) {
Map<String, String> labels = new HashMap<>();
labels.put(Constants.LABEL_APP_KEY, APP_ID);
labels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_TASK_MANAGER);
labels.put(Constants.LABEL_PRIORITY_KEY, String.valueOf(priority));
ObjectMeta meta = new ObjectMeta();
meta.setName(createPodName(podId));
meta.setLabels(labels);
Pod pod = new Pod();
pod.setMetadata(meta);
return pod;
}
protected long[] getSortedWorkerNodeIds(KubernetesResourceManager spyKubernetesRM){
long[] ids = spyKubernetesRM.getWorkerNodes().values().stream().mapToLong(e -> e.getPodId()).toArray();
Arrays.sort(ids);
return ids;
}
public KubernetesResourceManager mockResourceManager(
Configuration flinkConf, KubernetesConnectionManager kubernetesConnectionManager) {
KubernetesResourceManager kubernetesSessionRM =
createKubernetesResourceManager(flinkConf);
KubernetesResourceManager spyKubernetesRM = Mockito.spy(kubernetesSessionRM);
spyKubernetesRM.setOwnerReference(new OwnerReferenceBuilder().build());
Mockito.doReturn(kubernetesConnectionManager).when(spyKubernetesRM).createKubernetesConnectionManager();
try {
Mockito.doNothing().when(spyKubernetesRM).setupOwnerReference();
} catch (ResourceManagerException e) {
throw new RuntimeException(e);
}
return spyKubernetesRM;
}
@Test
public void testNormalProcess() throws Exception {
KubernetesConnectionManager kubernetesConnectionManager =
new KubernetesRMTestBase.TestingKubernetesConnectionManager(flinkConf);
KubernetesResourceManager spyKubernetesRM =
mockResourceManager(flinkConf, kubernetesConnectionManager);
// TM register check always return true
Mockito.doNothing().when(spyKubernetesRM).checkTMRegistered(Matchers.any());
// start RM
spyKubernetesRM.start();
// request worker nodes
ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 200);
ResourceProfile resourceProfile2 = new ResourceProfile(1.0, 500);
spyKubernetesRM.startNewWorker(resourceProfile1);
Assert.assertEquals(1, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(1, spyKubernetesRM.getPendingWorkerNodes().get(0).size());
spyKubernetesRM.startNewWorker(resourceProfile1);
Assert.assertEquals(1, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().get(0).size());
spyKubernetesRM.startNewWorker(resourceProfile2);
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(1, spyKubernetesRM.getPendingWorkerNodes().get(1).size());
spyKubernetesRM.startNewWorker(resourceProfile2);
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().get(1).size());
spyKubernetesRM.startNewWorker(resourceProfile2);
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(3, spyKubernetesRM.getPendingWorkerNodes().get(1).size());
// cancel 1 worker node(id=1) with priority 0
spyKubernetesRM.cancelNewWorker(resourceProfile1);
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(1, spyKubernetesRM.getPendingWorkerNodes().get(0).size());
// cancel 1 worker node(id=3) with priority 1
spyKubernetesRM.cancelNewWorker(resourceProfile2);
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().get(1).size());
Assert.assertEquals(new HashSet<>(), spyKubernetesRM.getWorkerNodes().keySet());
// add removed worker node(id=1) with priority 0, should be skipped
spyKubernetesRM.handlePodMessage(Watcher.Action.ADDED, createPod(0, 1));
Assert.assertEquals(0, spyKubernetesRM.getWorkerNodes().size());
// add removed worker node(id=3) with priority 1, should be skipped
spyKubernetesRM.handlePodMessage(Watcher.Action.ADDED, createPod(1, 3));
Assert.assertEquals(0, spyKubernetesRM.getWorkerNodes().size());
// add worker node(id=2) with priority 0
spyKubernetesRM.handlePodMessage(Watcher.Action.ADDED, createPod(0, 2));
Assert.assertEquals(1, spyKubernetesRM.getWorkerNodes().size());
Assert.assertArrayEquals(new long[]{2L}, getSortedWorkerNodeIds(spyKubernetesRM));
// add worker node(id=4) with priority 1
spyKubernetesRM.handlePodMessage(Watcher.Action.ADDED, createPod(1, 4));
Assert.assertEquals(2, spyKubernetesRM.getWorkerNodes().size());
Assert.assertArrayEquals(new long[]{2L, 4L}, getSortedWorkerNodeIds(spyKubernetesRM));
// stop worker node(id=4) with priority 1
// mock that number of pending slot requests is larger than number of pending worker nodes
Mockito.doReturn(2).when(spySlotManager).getNumberPendingSlotRequests();
spyKubernetesRM.stopWorker(spyKubernetesRM.getWorkerNodes().get(new ResourceID(createPodName(4))));
// should left 1 worker node(id=2)
Assert.assertEquals(1, spyKubernetesRM.getWorkerNodes().size());
Assert.assertArrayEquals(new long[]{2L}, getSortedWorkerNodeIds(spyKubernetesRM));
// should request new worker node inside stopWorker, now pending worker nodes: 2
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().get(1).size());
// deregister app
spyKubernetesRM.deregisterApplication(ApplicationStatus.SUCCEEDED, "");
// check stopped
Assert.assertTrue(spyKubernetesRM.isStopped());
}
@Test
public void testNormalProcessForMultiSlots() throws Exception {
// set max core=3 for multiple slots
Configuration newFlinkConf = new Configuration(flinkConf);
newFlinkConf.setDouble(TaskManagerOptions.TASK_MANAGER_MULTI_SLOTS_MAX_CORE, 4.0);
KubernetesConnectionManager kubernetesConnectionManager =
new KubernetesRMTestBase.TestingKubernetesConnectionManager(newFlinkConf);
KubernetesResourceManager spyKubernetesRM =
mockResourceManager(newFlinkConf, kubernetesConnectionManager);
// TM register check always return true
Mockito.doNothing().when(spyKubernetesRM).checkTMRegistered(Matchers.any());
// start RM
spyKubernetesRM.start();
// request 50 worker nodes with priority 0 and 1
ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 200);
ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 200);
int requestNum = 0;
for (int i = 0; i < 50; i++) {
requestNum++;
spyKubernetesRM.startNewWorker(resourceProfile1);
spyKubernetesRM.startNewWorker(resourceProfile2);
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(requestNum % 4 == 0 ? requestNum / 4 : requestNum / 4 + 1,
spyKubernetesRM.getPendingWorkerNodes().get(0).size());
Assert.assertEquals(requestNum % 2 == 0 ? requestNum / 2 : requestNum / 2 + 1,
spyKubernetesRM.getPendingWorkerNodes().get(1).size());
}
// cancel 47 worker nodes with priority 0 and 1
for (int j = 0; j < 47; j++) {
requestNum--;
spyKubernetesRM.cancelNewWorker(resourceProfile1);
spyKubernetesRM.cancelNewWorker(resourceProfile2);
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(requestNum % 4 == 0 ? requestNum / 4 : requestNum / 4 + 1,
spyKubernetesRM.getPendingWorkerNodes().get(0).size());
Assert.assertEquals(requestNum % 2 == 0 ? requestNum / 2 : requestNum / 2 + 1,
spyKubernetesRM.getPendingWorkerNodes().get(1).size());
}
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(1, spyKubernetesRM.getPendingWorkerNodes().get(0).size());
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().get(1).size());
List<String> ids = spyKubernetesRM.getPendingWorkerNodes().values().stream()
.flatMap(e -> e.stream()).map(e -> e.getResourceIdString()).collect(Collectors.toList());
Collections.sort(ids);
Assert.assertEquals(3, ids.size());
Assert.assertArrayEquals(new String[]{createPodName(36), createPodName(37),
createPodName(38)}, ids.toArray());
// add removed worker node(id=1) with priority 0, should be skipped
spyKubernetesRM.handlePodMessage(Watcher.Action.ADDED, createPod(0, 1));
Assert.assertEquals(0, spyKubernetesRM.getWorkerNodes().size());
// add removed worker node(id=2) with priority 1, should be skipped
spyKubernetesRM.handlePodMessage(Watcher.Action.ADDED, createPod(1, 2));
Assert.assertEquals(0, spyKubernetesRM.getWorkerNodes().size());
// add worker node(id=37) with priority 0
spyKubernetesRM.handlePodMessage(Watcher.Action.ADDED, createPod(0, 37));
Assert.assertEquals(1, spyKubernetesRM.getWorkerNodes().size());
Assert.assertArrayEquals(new long[]{37L}, getSortedWorkerNodeIds(spyKubernetesRM));
// add worker node(id=38) with priority 1
spyKubernetesRM.handlePodMessage(Watcher.Action.ADDED, createPod(1, 38));
Assert.assertEquals(2, spyKubernetesRM.getWorkerNodes().size());
Assert.assertArrayEquals(new long[]{37L, 38L}, getSortedWorkerNodeIds(spyKubernetesRM));
// add worker node(id=36) with priority 1
spyKubernetesRM.handlePodMessage(Watcher.Action.ADDED, createPod(1, 36));
Assert.assertEquals(3, spyKubernetesRM.getWorkerNodes().size());
Assert.assertArrayEquals(new long[]{36L, 37L, 38L}, getSortedWorkerNodeIds(spyKubernetesRM));
// no pending now
Assert.assertEquals(0, spyKubernetesRM.getPendingWorkerNodes().get(0).size());
Assert.assertEquals(0, spyKubernetesRM.getPendingWorkerNodes().get(1).size());
// stop worker node(id=36) with priority 1
// mock that number of pending slot requests is larger than number of pending worker nodes
Mockito.doReturn(2).when(spySlotManager).getNumberPendingSlotRequests();
spyKubernetesRM.stopWorker(spyKubernetesRM.getWorkerNodes().get(new ResourceID(createPodName(36))));
// should left 2 worker node(id=37,38)
Assert.assertEquals(2, spyKubernetesRM.getWorkerNodes().size());
Assert.assertArrayEquals(new long[]{37L, 38L}, getSortedWorkerNodeIds(spyKubernetesRM));
// should request new worker node inside stopWorker, now pending worker nodes: 1
Assert.assertEquals(1, spyKubernetesRM.getPendingWorkerNodes().get(1).size());
// deregister app
spyKubernetesRM.deregisterApplication(ApplicationStatus.SUCCEEDED, "");
// check stopped
Assert.assertTrue(spyKubernetesRM.isStopped());
}
@Test
public void testGetPreviousWorkerNodes() throws Exception {
KubernetesConnectionManager kubernetesConnectionManager =
new KubernetesRMTestBase.TestingKubernetesConnectionManager(flinkConf);
// add 5 pods as previous worker nodes
kubernetesConnectionManager.createPod(createPod(0, 1));
kubernetesConnectionManager.createPod(createPod(0, 3));
kubernetesConnectionManager.createPod(createPod(0, 5));
kubernetesConnectionManager.createPod(createPod(2, 4));
kubernetesConnectionManager.createPod(createPod(2, 6));
KubernetesResourceManager spyKubernetesRM =
mockResourceManager(flinkConf, kubernetesConnectionManager);
// TM register check always return true
Mockito.doNothing().when(spyKubernetesRM).checkTMRegistered(Matchers.any());
// start RM, get previous 5 worker nodes
spyKubernetesRM.start();
Assert.assertEquals(5, spyKubernetesRM.getNumberAllocatedWorkers());
Assert.assertArrayEquals(new long[]{1L, 3L, 4L, 5L, 6L}, getSortedWorkerNodeIds(spyKubernetesRM));
Assert.assertEquals(0, spyKubernetesRM.getPendingWorkerNodes().size());
// request worker nodes with two different profiles, priority should be 3 and 4
ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 200);
ResourceProfile resourceProfile2 = new ResourceProfile(1.0, 500);
spyKubernetesRM.startNewWorker(resourceProfile1);
Assert.assertEquals(1, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(1, spyKubernetesRM.getPendingWorkerNodes().get(3).size());
spyKubernetesRM.startNewWorker(resourceProfile1);
Assert.assertEquals(1, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().get(3).size());
spyKubernetesRM.startNewWorker(resourceProfile2);
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(1, spyKubernetesRM.getPendingWorkerNodes().get(4).size());
spyKubernetesRM.startNewWorker(resourceProfile2);
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().get(4).size());
spyKubernetesRM.startNewWorker(resourceProfile2);
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(3, spyKubernetesRM.getPendingWorkerNodes().get(4).size());
// add invalid worker node(id=2) with priority 0, should be skipped
spyKubernetesRM.handlePodMessage(Watcher.Action.ADDED, createPod(0, 2));
Assert.assertEquals(5, spyKubernetesRM.getWorkerNodes().size());
// add already exist worker node(id=4) with priority 2, should be skipped
spyKubernetesRM.handlePodMessage(Watcher.Action.ADDED, createPod(2, 4));
Assert.assertEquals(5, spyKubernetesRM.getWorkerNodes().size());
// add worker node(id=7) with priority 3
spyKubernetesRM.handlePodMessage(Watcher.Action.ADDED, createPod(3, 7));
Assert.assertEquals(6, spyKubernetesRM.getWorkerNodes().size());
Assert.assertArrayEquals(new long[]{1L, 3L, 4L, 5L, 6L, 7L}, getSortedWorkerNodeIds(spyKubernetesRM));
Assert.assertEquals(1, spyKubernetesRM.getPendingWorkerNodes().get(3).size());
// add worker node(id=9) with priority 4
spyKubernetesRM.handlePodMessage(Watcher.Action.ADDED, createPod(4, 9));
Assert.assertEquals(7, spyKubernetesRM.getWorkerNodes().size());
Assert.assertArrayEquals(new long[]{1L, 3L, 4L, 5L, 6L, 7L, 9L}, getSortedWorkerNodeIds(spyKubernetesRM));
Assert.assertEquals(2, spyKubernetesRM.getPendingWorkerNodes().get(4).size());
// add worker node(id=8) with priority 3
spyKubernetesRM.handlePodMessage(Watcher.Action.ADDED, createPod(3, 8));
Assert.assertEquals(8, spyKubernetesRM.getWorkerNodes().size());
Assert.assertArrayEquals(new long[]{1L, 3L, 4L, 5L, 6L, 7L, 8L, 9L}, getSortedWorkerNodeIds(spyKubernetesRM));
Assert.assertEquals(0, spyKubernetesRM.getPendingWorkerNodes().get(3).size());
// request worker nodes with priority 3
spyKubernetesRM.startNewWorker(resourceProfile1);
Assert.assertEquals(1, spyKubernetesRM.getPendingWorkerNodes().get(3).size());
// deregister app
spyKubernetesRM.deregisterApplication(ApplicationStatus.SUCCEEDED, "");
// check stopped
Assert.assertTrue(spyKubernetesRM.isStopped());
}
@Test
public void testTaskManagerRegisterTimeout() throws Exception {
KubernetesConnectionManager kubernetesConnectionManager =
new KubernetesRMTestBase.TestingKubernetesConnectionManager(flinkConf);
KubernetesResourceManager spyKubernetesRM =
mockResourceManager(flinkConf, kubernetesConnectionManager);
// start RM
spyKubernetesRM.start();
Assert.assertEquals(0, spyKubernetesRM.getNumberAllocatedWorkers());
// request worker nodes with priority 0
int requestNum = 5;
ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 200);
for (int i = 0; i < requestNum; i++) {
spyKubernetesRM.startNewWorker(resourceProfile1);
Assert.assertEquals(1, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(i + 1, spyKubernetesRM.getPendingWorkerNodes().get(0).size());
}
// add 2 worker nodes(id=1,2) with priority 0
int addNum = 2;
for (int i = 0; i < addNum; i++) {
spyKubernetesRM
.handlePodMessage(Watcher.Action.ADDED, createPod(0, i + 1));
Assert.assertEquals(i + 1, spyKubernetesRM.getWorkerNodes().size());
Assert.assertEquals(requestNum - 1 - i,
spyKubernetesRM.getPendingWorkerNodes().get(0).size());
}
Assert.assertArrayEquals(LongStream.rangeClosed(1, addNum).toArray(), getSortedWorkerNodeIds(spyKubernetesRM));
Mockito.verify(spyKubernetesRM, Mockito.times(requestNum))
.requestNewWorkerNode(Mockito.any(), Mockito.anyInt());
// trigger task manager register timeout, should request new worker nodes
Mockito.doReturn(requestNum).when(spySlotManager).getNumberPendingSlotRequests();
spyKubernetesRM.getWorkerNodes().keySet().stream().forEach(spyKubernetesRM::checkTMRegistered);
Mockito.verify(spyKubernetesRM, Mockito.times(requestNum + addNum))
.requestNewWorkerNode(Mockito.any(), Mockito.anyInt());
Assert.assertEquals(0, spyKubernetesRM.getNumberAllocatedWorkers());
Assert.assertEquals(1, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(requestNum, spyKubernetesRM.getPendingWorkerNodes().get(0).size());
// deregister app
spyKubernetesRM.deregisterApplication(ApplicationStatus.SUCCEEDED, "");
// check stopped
Assert.assertTrue(spyKubernetesRM.isStopped());
}
@Test
public void testExitWhenReachMaxFailedAttempts() throws Exception {
// set max failed attempts = 2
int maxFailedAttempts = 2;
Configuration newFlinkConf = new Configuration(flinkConf);
newFlinkConf.setInteger(KubernetesConfigOptions.WORKER_NODE_MAX_FAILED_ATTEMPTS, maxFailedAttempts);
KubernetesConnectionManager kubernetesConnectionManager =
new KubernetesRMTestBase.TestingKubernetesConnectionManager(newFlinkConf);
KubernetesResourceManager spyKubernetesRM =
mockResourceManager(newFlinkConf, kubernetesConnectionManager);
// start RM
spyKubernetesRM.start();
Assert.assertEquals(0, spyKubernetesRM.getNumberAllocatedWorkers());
Assert.assertEquals(0, spyKubernetesRM.getPendingWorkerNodes().size());
// request worker nodes with priority 0
int requestNum = 5;
ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 200);
for (int i = 0; i < requestNum; i++) {
spyKubernetesRM.startNewWorker(resourceProfile1);
Assert.assertEquals(1, spyKubernetesRM.getPendingWorkerNodes().size());
Assert.assertEquals(i + 1, spyKubernetesRM.getPendingWorkerNodes().get(0).size());
}
// add 2 worker nodes(id=1,2) with priority 0
int addNum = 2;
for (int i = 0; i < addNum; i++) {
spyKubernetesRM
.handlePodMessage(Watcher.Action.ADDED, createPod(0, i + 1));
Assert.assertEquals(i + 1, spyKubernetesRM.getWorkerNodes().size());
Assert.assertEquals(requestNum - 1 - i,
spyKubernetesRM.getPendingWorkerNodes().get(0).size());
}
Assert.assertArrayEquals(LongStream.rangeClosed(1, addNum).toArray(), getSortedWorkerNodeIds(spyKubernetesRM));
Mockito.verify(spyKubernetesRM, Mockito.times(requestNum))
.requestNewWorkerNode(Mockito.any(), Mockito.anyInt());
// trigger task manager register timeout, should request new worker nodes
Mockito.doReturn(requestNum).when(spySlotManager).getNumberPendingSlotRequests();
spyKubernetesRM.getWorkerNodes().keySet().stream().forEach(spyKubernetesRM::checkTMRegistered);
// won't add new request for the last attempt
Assert.assertEquals(requestNum - 1, spyKubernetesRM.getPendingWorkerNodes().get(0).size());
// will stop internally
Assert.assertTrue(spyKubernetesRM.isStopped());
}
@Test
public void testConnectionLostButNotReachMaxRetryTimes() throws ResourceManagerException {
super.testConnectionLostButNotReachMaxRetryTimes();
}
@Test
public void testConnectionLostAndReachMaxRetryTimes() throws ResourceManagerException {
super.testConnectionLostAndReachMaxRetryTimes();
}
}