blob: 998d0108c6eaa6700a4ad0fc7d8ac8dac195651e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Tests for the {@link ResourceManager} and {@link TaskExecutor} interaction. */
public class ResourceManagerTaskExecutorTest extends TestLogger {
private static final Time TIMEOUT = TestingUtils.infiniteTime();
private static final ResourceProfile DEFAULT_SLOT_PROFILE =
ResourceProfile.fromResources(1.0, 1234);
@ClassRule
public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorResource();
private static TestingRpcService rpcService;
private TestingTaskExecutorGateway taskExecutorGateway;
private int dataPort = 1234;
private int jmxPort = 23456;
private HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
private ResourceID taskExecutorResourceID;
private TestingResourceManagerService rmService;
private ResourceManagerGateway rmGateway;
private ResourceManagerGateway wronglyFencedGateway;
@BeforeClass
public static void setupClass() {
rpcService = new TestingRpcService();
}
@Before
public void setup() throws Exception {
rpcService = new TestingRpcService();
createAndRegisterTaskExecutorGateway();
taskExecutorResourceID = ResourceID.generate();
createAndStartResourceManager();
wronglyFencedGateway =
rpcService
.connect(
rmGateway.getAddress(),
ResourceManagerId.generate(),
ResourceManagerGateway.class)
.get();
}
private void createAndRegisterTaskExecutorGateway() {
taskExecutorGateway =
new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
}
private void createAndStartResourceManager() throws Exception {
final TestingLeaderElection leaderElection = new TestingLeaderElection();
rmService =
TestingResourceManagerService.newBuilder()
.setRpcService(rpcService)
.setRmLeaderElection(leaderElection)
.build();
rmService.start();
rmService.isLeader(UUID.randomUUID()).join();
rmGateway =
rmService
.getResourceManagerGateway()
.orElseThrow(
() ->
new AssertionError(
"RM not available after confirming leadership."));
}
@After
public void teardown() throws Exception {
if (rmService != null) {
rmService.rethrowFatalErrorIfAny();
rmService.cleanUp();
}
}
@AfterClass
public static void teardownClass() throws Exception {
if (rpcService != null) {
RpcUtils.terminateRpcService(rpcService);
}
}
/**
* Test receive normal registration from task executor and receive duplicate registration from
* task executor.
*/
@Test
public void testRegisterTaskExecutor() throws Exception {
// test response successful
CompletableFuture<RegistrationResponse> successfulFuture =
registerTaskExecutor(rmGateway, taskExecutorGateway.getAddress());
RegistrationResponse response = successfulFuture.get();
assertTrue(response instanceof TaskExecutorRegistrationSuccess);
final TaskManagerInfoWithSlots taskManagerInfoWithSlots =
rmGateway.requestTaskManagerDetailsInfo(taskExecutorResourceID, TIMEOUT).get();
assertThat(
taskManagerInfoWithSlots.getTaskManagerInfo().getResourceId(),
equalTo(taskExecutorResourceID));
// test response successful with instanceID not equal to previous when receive duplicate
// registration from taskExecutor
CompletableFuture<RegistrationResponse> duplicateFuture =
registerTaskExecutor(rmGateway, taskExecutorGateway.getAddress());
RegistrationResponse duplicateResponse = duplicateFuture.get();
assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
assertNotEquals(
((TaskExecutorRegistrationSuccess) response).getRegistrationId(),
((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
assertThat(rmGateway.requestResourceOverview(TIMEOUT).get().getNumberTaskManagers(), is(1));
}
/**
* Test delayed registration of task executor where the delay is introduced during connection
* from resource manager to the registering task executor.
*/
@Test
public void testDelayedRegisterTaskExecutor() throws Exception {
final Time fastTimeout = Time.milliseconds(1L);
try {
final OneShotLatch startConnection = new OneShotLatch();
final OneShotLatch finishConnection = new OneShotLatch();
// first registration is with blocking connection
rpcService.setRpcGatewayFutureFunction(
rpcGateway ->
CompletableFuture.supplyAsync(
() -> {
startConnection.trigger();
try {
finishConnection.await();
} catch (InterruptedException ignored) {
}
return rpcGateway;
},
EXECUTOR_RESOURCE.getExecutor()));
TaskExecutorRegistration taskExecutorRegistration =
new TaskExecutorRegistration(
taskExecutorGateway.getAddress(),
taskExecutorResourceID,
dataPort,
jmxPort,
hardwareDescription,
new TaskExecutorMemoryConfiguration(
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
DEFAULT_SLOT_PROFILE,
DEFAULT_SLOT_PROFILE,
taskExecutorGateway.getAddress());
CompletableFuture<RegistrationResponse> firstFuture =
rmGateway.registerTaskExecutor(taskExecutorRegistration, fastTimeout);
try {
firstFuture.get();
fail(
"Should have failed because connection to taskmanager is delayed beyond timeout");
} catch (Exception e) {
final Throwable cause = ExceptionUtils.stripExecutionException(e);
assertThat(cause, instanceOf(TimeoutException.class));
assertThat(
cause.getMessage(),
containsString("ResourceManagerGateway.registerTaskExecutor"));
}
startConnection.await();
// second registration after timeout is with no delay, expecting it to be succeeded
rpcService.resetRpcGatewayFutureFunction();
CompletableFuture<RegistrationResponse> secondFuture =
rmGateway.registerTaskExecutor(taskExecutorRegistration, TIMEOUT);
RegistrationResponse response = secondFuture.get();
assertTrue(response instanceof TaskExecutorRegistrationSuccess);
// on success, send slot report for taskmanager registration
final SlotReport slotReport =
new SlotReport(
new SlotStatus(
new SlotID(taskExecutorResourceID, 0), ResourceProfile.ANY));
rmGateway
.sendSlotReport(
taskExecutorResourceID,
((TaskExecutorRegistrationSuccess) response).getRegistrationId(),
slotReport,
TIMEOUT)
.get();
// let the remaining part of the first registration proceed
finishConnection.trigger();
Thread.sleep(1L);
// verify that the latest registration is valid not being unregistered by the delayed
// one
final TaskManagerInfoWithSlots taskManagerInfoWithSlots =
rmGateway.requestTaskManagerDetailsInfo(taskExecutorResourceID, TIMEOUT).get();
assertThat(
taskManagerInfoWithSlots.getTaskManagerInfo().getResourceId(),
equalTo(taskExecutorResourceID));
assertThat(taskManagerInfoWithSlots.getTaskManagerInfo().getNumberSlots(), equalTo(1));
} finally {
rpcService.resetRpcGatewayFutureFunction();
}
}
/** Tests that a TaskExecutor can disconnect from the {@link ResourceManager}. */
@Test
public void testDisconnectTaskExecutor() throws Exception {
final int numberSlots = 10;
final TaskExecutorRegistration taskExecutorRegistration =
new TaskExecutorRegistration(
taskExecutorGateway.getAddress(),
taskExecutorResourceID,
dataPort,
jmxPort,
hardwareDescription,
new TaskExecutorMemoryConfiguration(
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
DEFAULT_SLOT_PROFILE,
DEFAULT_SLOT_PROFILE.multiply(numberSlots),
taskExecutorGateway.getAddress());
final RegistrationResponse registrationResponse =
rmGateway.registerTaskExecutor(taskExecutorRegistration, TIMEOUT).get();
assertThat(registrationResponse, instanceOf(TaskExecutorRegistrationSuccess.class));
final InstanceID registrationId =
((TaskExecutorRegistrationSuccess) registrationResponse).getRegistrationId();
final Collection<SlotStatus> slots = createSlots(numberSlots);
final SlotReport slotReport = new SlotReport(slots);
rmGateway.sendSlotReport(taskExecutorResourceID, registrationId, slotReport, TIMEOUT).get();
final ResourceOverview resourceOverview = rmGateway.requestResourceOverview(TIMEOUT).get();
assertThat(resourceOverview.getNumberTaskManagers(), is(1));
assertThat(resourceOverview.getNumberRegisteredSlots(), is(numberSlots));
rmGateway.disconnectTaskManager(
taskExecutorResourceID, new FlinkException("testDisconnectTaskExecutor"));
final ResourceOverview afterDisconnectResourceOverview =
rmGateway.requestResourceOverview(TIMEOUT).get();
assertThat(afterDisconnectResourceOverview.getNumberTaskManagers(), is(0));
assertThat(afterDisconnectResourceOverview.getNumberRegisteredSlots(), is(0));
}
private Collection<SlotStatus> createSlots(int numberSlots) {
return IntStream.range(0, numberSlots)
.mapToObj(
index ->
new SlotStatus(
new SlotID(taskExecutorResourceID, index),
ResourceProfile.ANY))
.collect(Collectors.toList());
}
/** Test receive registration with unmatched leadershipId from task executor. */
@Test
public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception {
// test throw exception when receive a registration from taskExecutor which takes unmatched
// leaderSessionId
CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
registerTaskExecutor(wronglyFencedGateway, taskExecutorGateway.getAddress());
try {
unMatchedLeaderFuture.get();
fail(
"Should have failed because we are using a wrongly fenced ResourceManagerGateway.");
} catch (ExecutionException e) {
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
}
}
/** Test receive registration with invalid address from task executor. */
@Test
public void testRegisterTaskExecutorFromInvalidAddress() throws Exception {
// test throw exception when receive a registration from taskExecutor which takes invalid
// address
String invalidAddress = "/taskExecutor2";
CompletableFuture<RegistrationResponse> invalidAddressFuture =
registerTaskExecutor(rmGateway, invalidAddress);
assertTrue(invalidAddressFuture.get() instanceof RegistrationResponse.Failure);
}
private CompletableFuture<RegistrationResponse> registerTaskExecutor(
ResourceManagerGateway resourceManagerGateway, String taskExecutorAddress) {
return resourceManagerGateway.registerTaskExecutor(
new TaskExecutorRegistration(
taskExecutorAddress,
taskExecutorResourceID,
dataPort,
jmxPort,
hardwareDescription,
new TaskExecutorMemoryConfiguration(
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
DEFAULT_SLOT_PROFILE,
DEFAULT_SLOT_PROFILE,
taskExecutorAddress),
TIMEOUT);
}
}