blob: c531daf6d76ad6f9a4199f8137d2e46c2117dc91 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorHeartbeatPayload;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
/** Tests for the partition-lifecycle logic in the {@link ResourceManager}. */
public class ResourceManagerPartitionLifecycleTest extends TestLogger {
private static TestingRpcService rpcService;
private TestingResourceManagerService resourceManagerService;
@BeforeClass
public static void setupClass() {
rpcService = new TestingRpcService();
}
@Before
public void setup() throws Exception {}
@After
public void after() throws Exception {
if (resourceManagerService != null) {
resourceManagerService.rethrowFatalErrorIfAny();
resourceManagerService.cleanUp();
}
}
@AfterClass
public static void tearDownClass() throws Exception {
if (rpcService != null) {
RpcUtils.terminateRpcService(rpcService);
}
}
@Test
public void testClusterPartitionReportHandling() throws Exception {
final CompletableFuture<Collection<IntermediateDataSetID>> clusterPartitionReleaseFuture =
new CompletableFuture<>();
runTest(
builder ->
builder.setReleaseClusterPartitionsConsumer(
clusterPartitionReleaseFuture::complete),
(resourceManagerGateway, taskManagerId1, ignored) -> {
IntermediateDataSetID dataSetID = new IntermediateDataSetID();
ResultPartitionID resultPartitionID = new ResultPartitionID();
resourceManagerGateway.heartbeatFromTaskManager(
taskManagerId1,
createTaskExecutorHeartbeatPayload(
dataSetID, 2, resultPartitionID, new ResultPartitionID()));
// send a heartbeat containing 1 partition less -> partition loss -> should
// result in partition release
resourceManagerGateway.heartbeatFromTaskManager(
taskManagerId1,
createTaskExecutorHeartbeatPayload(dataSetID, 2, resultPartitionID));
Collection<IntermediateDataSetID> intermediateDataSetIDS =
clusterPartitionReleaseFuture.get();
assertThat(intermediateDataSetIDS, contains(dataSetID));
});
}
@Test
public void testTaskExecutorShutdownHandling() throws Exception {
final CompletableFuture<Collection<IntermediateDataSetID>> clusterPartitionReleaseFuture =
new CompletableFuture<>();
runTest(
builder ->
builder.setReleaseClusterPartitionsConsumer(
clusterPartitionReleaseFuture::complete),
(resourceManagerGateway, taskManagerId1, taskManagerId2) -> {
IntermediateDataSetID dataSetID = new IntermediateDataSetID();
resourceManagerGateway.heartbeatFromTaskManager(
taskManagerId1,
createTaskExecutorHeartbeatPayload(
dataSetID, 2, new ResultPartitionID()));
// we need a partition on another task executor so that there's something to
// release when one task executor goes down
resourceManagerGateway.heartbeatFromTaskManager(
taskManagerId2,
createTaskExecutorHeartbeatPayload(
dataSetID, 2, new ResultPartitionID()));
resourceManagerGateway.disconnectTaskManager(
taskManagerId2, new RuntimeException("test exception"));
Collection<IntermediateDataSetID> intermediateDataSetIDS =
clusterPartitionReleaseFuture.get();
assertThat(intermediateDataSetIDS, contains(dataSetID));
});
}
private void runTest(TaskExecutorSetup taskExecutorBuilderSetup, TestAction testAction)
throws Exception {
final ResourceManagerGateway resourceManagerGateway = createAndStartResourceManager();
TestingTaskExecutorGatewayBuilder testingTaskExecutorGateway1Builder =
new TestingTaskExecutorGatewayBuilder();
taskExecutorBuilderSetup.accept(testingTaskExecutorGateway1Builder);
final TaskExecutorGateway taskExecutorGateway1 =
testingTaskExecutorGateway1Builder
.setAddress(UUID.randomUUID().toString())
.createTestingTaskExecutorGateway();
rpcService.registerGateway(taskExecutorGateway1.getAddress(), taskExecutorGateway1);
final TaskExecutorGateway taskExecutorGateway2 =
new TestingTaskExecutorGatewayBuilder()
.setAddress(UUID.randomUUID().toString())
.createTestingTaskExecutorGateway();
rpcService.registerGateway(taskExecutorGateway2.getAddress(), taskExecutorGateway2);
final ResourceID taskManagerId1 = ResourceID.generate();
final ResourceID taskManagerId2 = ResourceID.generate();
registerTaskExecutor(
resourceManagerGateway, taskManagerId1, taskExecutorGateway1.getAddress());
registerTaskExecutor(
resourceManagerGateway, taskManagerId2, taskExecutorGateway2.getAddress());
testAction.accept(resourceManagerGateway, taskManagerId1, taskManagerId2);
}
public static void registerTaskExecutor(
ResourceManagerGateway resourceManagerGateway,
ResourceID taskExecutorId,
String taskExecutorAddress)
throws Exception {
final TaskExecutorRegistration taskExecutorRegistration =
new TaskExecutorRegistration(
taskExecutorAddress,
taskExecutorId,
1234,
23456,
new HardwareDescription(42, 1337L, 1337L, 0L),
new TaskExecutorMemoryConfiguration(
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
ResourceProfile.ZERO,
ResourceProfile.ZERO,
taskExecutorAddress);
final CompletableFuture<RegistrationResponse> registrationFuture =
resourceManagerGateway.registerTaskExecutor(
taskExecutorRegistration, TestingUtils.TIMEOUT);
assertThat(registrationFuture.get(), instanceOf(RegistrationResponse.Success.class));
}
private ResourceManagerGateway createAndStartResourceManager() throws Exception {
final TestingLeaderElectionService leaderElectionService =
new TestingLeaderElectionService();
resourceManagerService =
TestingResourceManagerService.newBuilder()
.setRpcService(rpcService)
.setRmLeaderElectionService(leaderElectionService)
.build();
resourceManagerService.start();
// first make the ResourceManager the leader
resourceManagerService.isLeader(UUID.randomUUID());
leaderElectionService.getConfirmationFuture().get();
return resourceManagerService
.getResourceManagerGateway()
.orElseThrow(
() -> new AssertionError("RM not available after confirming leadership."));
}
private static TaskExecutorHeartbeatPayload createTaskExecutorHeartbeatPayload(
IntermediateDataSetID dataSetId,
int numTotalPartitions,
ResultPartitionID... partitionIds) {
final Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors =
Arrays.stream(partitionIds)
.map(TestingShuffleDescriptor::new)
.collect(
Collectors.toMap(
TestingShuffleDescriptor::getResultPartitionID, d -> d));
return new TaskExecutorHeartbeatPayload(
new SlotReport(),
new ClusterPartitionReport(
Collections.singletonList(
new ClusterPartitionReport.ClusterPartitionReportEntry(
dataSetId, numTotalPartitions, shuffleDescriptors))));
}
@FunctionalInterface
private interface TaskExecutorSetup {
void accept(TestingTaskExecutorGatewayBuilder taskExecutorGatewayBuilder) throws Exception;
}
@FunctionalInterface
private interface TestAction {
void accept(
ResourceManagerGateway resourceManagerGateway,
ResourceID taskExecutorId1,
ResourceID taskExecutorId2)
throws Exception;
}
private static class TestingShuffleDescriptor implements ShuffleDescriptor {
private final ResultPartitionID resultPartitionID;
private TestingShuffleDescriptor(ResultPartitionID resultPartitionID) {
this.resultPartitionID = resultPartitionID;
}
@Override
public ResultPartitionID getResultPartitionID() {
return resultPartitionID;
}
@Override
public Optional<ResourceID> storesLocalResourcesOn() {
return Optional.empty();
}
}
}