blob: 25e5220239c7078c610872728566f3f5213971f3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.CoreMatchers;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.Test;
import javax.annotation.Nonnull;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Tests for the {@link SlotManagerImpl}.
*/
public class SlotManagerImplTest extends TestLogger {
private static final FlinkException TEST_EXCEPTION = new FlinkException("Test exception");
private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = new WorkerResourceSpec.Builder()
.setCpuCores(100.0)
.setTaskHeapMemoryMB(10000)
.setTaskOffHeapMemoryMB(10000)
.setNetworkMemoryMB(10000)
.setManagedMemoryMB(10000)
.build();
/**
* Tests that we can register task manager and their slots at the slot manager.
*/
@Test
public void testTaskManagerRegistration() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final ResourceID resourceId = ResourceID.generate();
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
final ResourceProfile resourceProfile = ResourceProfile.fromResources(42.0, 1337);
final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
slotManager.registerTaskManager(taskManagerConnection, slotReport);
assertTrue("The number registered slots does not equal the expected number.", 2 == slotManager.getNumberRegisteredSlots());
assertNotNull(slotManager.getSlot(slotId1));
assertNotNull(slotManager.getSlot(slotId2));
}
}
/**
* Tests that un-registration of task managers will free and remove all registered slots.
*/
@Test
public void testTaskManagerUnregistration() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
final JobID jobId = new JobID();
final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(tuple6 -> {
assertThat(tuple6.f5, is(equalTo(resourceManagerId)));
return new CompletableFuture<>();
})
.createTestingTaskExecutorGateway();
final ResourceID resourceId = ResourceID.generate();
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
final AllocationID allocationId1 = new AllocationID();
final AllocationID allocationId2 = new AllocationID();
final ResourceProfile resourceProfile = ResourceProfile.fromResources(42.0, 1337);
final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile, jobId, allocationId1);
final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
final SlotRequest slotRequest = new SlotRequest(
new JobID(),
allocationId2,
resourceProfile,
"foobar");
try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
slotManager.registerTaskManager(taskManagerConnection, slotReport);
assertTrue("The number registered slots does not equal the expected number.", 2 == slotManager.getNumberRegisteredSlots());
TaskManagerSlot slot1 = slotManager.getSlot(slotId1);
TaskManagerSlot slot2 = slotManager.getSlot(slotId2);
assertTrue(slot1.getState() == TaskManagerSlot.State.ALLOCATED);
assertTrue(slot2.getState() == TaskManagerSlot.State.FREE);
assertTrue(slotManager.registerSlotRequest(slotRequest));
assertFalse(slot2.getState() == TaskManagerSlot.State.FREE);
assertTrue(slot2.getState() == TaskManagerSlot.State.PENDING);
PendingSlotRequest pendingSlotRequest = slotManager.getSlotRequest(allocationId2);
assertTrue("The pending slot request should have been assigned to slot 2", pendingSlotRequest.isAssigned());
slotManager.unregisterTaskManager(taskManagerConnection.getInstanceID(), TEST_EXCEPTION);
assertTrue(0 == slotManager.getNumberRegisteredSlots());
assertFalse(pendingSlotRequest.isAssigned());
}
}
/**
* Tests that a slot request with no free slots will trigger the resource allocation.
*/
@Test
public void testSlotRequestWithoutFreeSlots() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceProfile resourceProfile = ResourceProfile.fromResources(42.0, 1337);
final SlotRequest slotRequest = new SlotRequest(
new JobID(),
new AllocationID(),
resourceProfile,
"localhost");
CompletableFuture<WorkerResourceSpec> allocateResourceFuture = new CompletableFuture<>();
ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
.setAllocateResourceConsumer(allocateResourceFuture::complete)
.build();
try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
slotManager.registerSlotRequest(slotRequest);
allocateResourceFuture.get();
}
}
/**
* Tests that the slot request fails if we cannot allocate more resources.
*/
@Test
public void testSlotRequestWithResourceAllocationFailure() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceProfile resourceProfile = ResourceProfile.fromResources(42.0, 1337);
final SlotRequest slotRequest = new SlotRequest(
new JobID(),
new AllocationID(),
resourceProfile,
"localhost");
ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
.setAllocateResourceFunction(value -> false)
.build();
try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
slotManager.registerSlotRequest(slotRequest);
fail("The slot request should have failed with a ResourceManagerException.");
} catch (ResourceManagerException e) {
// expected exception
}
}
/**
* Tests that a slot request which can be fulfilled will trigger a slot allocation.
*/
@Test
public void testSlotRequestWithFreeSlot() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceID resourceID = ResourceID.generate();
final JobID jobId = new JobID();
final SlotID slotId = new SlotID(resourceID, 0);
final String targetAddress = "localhost";
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile = ResourceProfile.fromResources(42.0, 1337);
final SlotRequest slotRequest = new SlotRequest(
jobId,
allocationId,
resourceProfile,
targetAddress);
ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
final CompletableFuture<Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, ResourceManagerId>> requestFuture = new CompletableFuture<>();
// accept an incoming slot request
final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(tuple6 -> {
requestFuture.complete(Tuple6.of(tuple6.f0, tuple6.f1, tuple6.f2, tuple6.f3, tuple6.f4, tuple6.f5));
return CompletableFuture.completedFuture(Acknowledge.get());
})
.createTestingTaskExecutorGateway();
final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
final SlotReport slotReport = new SlotReport(slotStatus);
slotManager.registerTaskManager(
taskExecutorConnection,
slotReport);
assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest));
assertThat(requestFuture.get(), is(equalTo(Tuple6.of(slotId, jobId, allocationId, resourceProfile, targetAddress, resourceManagerId))));
TaskManagerSlot slot = slotManager.getSlot(slotId);
assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId());
}
}
/**
* Checks that un-registering a pending slot request will cancel it, removing it from all
* assigned task manager slots and then remove it from the slot manager.
*/
@Test
public void testUnregisterPendingSlotRequest() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
final ResourceID resourceID = ResourceID.generate();
final SlotID slotId = new SlotID(resourceID, 0);
final AllocationID allocationId = new AllocationID();
final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple6 -> new CompletableFuture<>())
.createTestingTaskExecutorGateway();
final ResourceProfile resourceProfile = ResourceProfile.fromResources(1.0, 1);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
final SlotReport slotReport = new SlotReport(slotStatus);
final SlotRequest slotRequest = new SlotRequest(new JobID(), allocationId, resourceProfile, "foobar");
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
slotManager.registerTaskManager(taskManagerConnection, slotReport);
TaskManagerSlot slot = slotManager.getSlot(slotId);
slotManager.registerSlotRequest(slotRequest);
assertNotNull(slotManager.getSlotRequest(allocationId));
assertTrue(slot.getState() == TaskManagerSlot.State.PENDING);
slotManager.unregisterSlotRequest(allocationId);
assertNull(slotManager.getSlotRequest(allocationId));
slot = slotManager.getSlot(slotId);
assertTrue(slot.getState() == TaskManagerSlot.State.FREE);
}
}
/**
* Tests that pending slot requests are tried to be fulfilled upon new slot registrations.
*/
@Test
public void testFulfillingPendingSlotRequest() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceID resourceID = ResourceID.generate();
final JobID jobId = new JobID();
final SlotID slotId = new SlotID(resourceID, 0);
final String targetAddress = "localhost";
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile = ResourceProfile.fromResources(42.0, 1337);
final SlotRequest slotRequest = new SlotRequest(
jobId,
allocationId,
resourceProfile,
targetAddress);
final AtomicInteger numberAllocateResourceCalls = new AtomicInteger(0);
ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
.setAllocateResourceConsumer(ignored -> numberAllocateResourceCalls.incrementAndGet())
.build();
final CompletableFuture<Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, ResourceManagerId>> requestFuture = new CompletableFuture<>();
// accept an incoming slot request
final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(tuple6 -> {
requestFuture.complete(Tuple6.of(tuple6.f0, tuple6.f1, tuple6.f2, tuple6.f3, tuple6.f4, tuple6.f5));
return CompletableFuture.completedFuture(Acknowledge.get());
})
.createTestingTaskExecutorGateway();
final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
final SlotReport slotReport = new SlotReport(slotStatus);
try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest));
assertThat(numberAllocateResourceCalls.get(), is(1));
slotManager.registerTaskManager(
taskExecutorConnection,
slotReport);
assertThat(requestFuture.get(), is(equalTo(Tuple6.of(slotId, jobId, allocationId, resourceProfile, targetAddress, resourceManagerId))));
TaskManagerSlot slot = slotManager.getSlot(slotId);
assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId());
}
}
/**
* Tests that freeing a slot will correctly reset the slot and mark it as a free slot.
*/
@Test
public void testFreeSlot() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final JobID jobId = new JobID();
ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection();
final ResourceID resourceID = taskExecutorConnection.getResourceID();
final SlotID slotId = new SlotID(resourceID, 0);
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile = ResourceProfile.fromResources(42.0, 1337);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
final SlotReport slotReport = new SlotReport(slotStatus);
try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
slotManager.registerTaskManager(
taskExecutorConnection,
slotReport);
TaskManagerSlot slot = slotManager.getSlot(slotId);
assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId());
// this should be ignored since the allocation id does not match
slotManager.freeSlot(slotId, new AllocationID());
assertTrue(slot.getState() == TaskManagerSlot.State.ALLOCATED);
assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId());
slotManager.freeSlot(slotId, allocationId);
assertTrue(slot.getState() == TaskManagerSlot.State.FREE);
assertNull(slot.getAllocationId());
}
}
/**
* Tests that a second pending slot request is detected as a duplicate if the allocation ids are
* the same.
*/
@Test
public void testDuplicatePendingSlotRequest() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final AtomicInteger numberAllocateResourceFunctionCalls = new AtomicInteger(0);
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
.setAllocateResourceConsumer(ignored -> numberAllocateResourceFunctionCalls.incrementAndGet())
.build();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1.0, 2);
final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2.0, 1);
final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
assertTrue(slotManager.registerSlotRequest(slotRequest1));
assertFalse(slotManager.registerSlotRequest(slotRequest2));
}
// check that we have only called the resource allocation only for the first slot request,
// since the second request is a duplicate
assertThat(numberAllocateResourceFunctionCalls.get(), is(1));
}
/**
* Tests that if we have received a slot report with some allocated slots, then we don't accept
* slot requests with allocated allocation ids.
*/
@Test
public void testDuplicatePendingSlotRequestAfterSlotReport() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
final TaskExecutorConnection taskManagerConnection = createTaskExecutorConnection();
final ResourceID resourceID = taskManagerConnection.getResourceID();
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile = ResourceProfile.fromResources(1.0, 1);
final SlotID slotId = new SlotID(resourceID, 0);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
final SlotReport slotReport = new SlotReport(slotStatus);
final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
slotManager.registerTaskManager(taskManagerConnection, slotReport);
assertFalse(slotManager.registerSlotRequest(slotRequest));
}
}
/**
* Tests that duplicate slot requests (requests with an already registered allocation id) are
* also detected after a pending slot request has been fulfilled but not yet freed.
*/
@Test
public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final AtomicInteger allocateResourceCalls = new AtomicInteger(0);
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
.setAllocateResourceConsumer(ignored -> allocateResourceCalls.incrementAndGet())
.build();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1.0, 2);
final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2.0, 1);
final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final ResourceID resourceID = ResourceID.generate();
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotID slotId = new SlotID(resourceID, 0);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile1);
final SlotReport slotReport = new SlotReport(slotStatus);
try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
slotManager.registerTaskManager(taskManagerConnection, slotReport);
assertTrue(slotManager.registerSlotRequest(slotRequest1));
TaskManagerSlot slot = slotManager.getSlot(slotId);
assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId());
assertFalse(slotManager.registerSlotRequest(slotRequest2));
}
// check that we have only called the resource allocation only for the first slot request,
// since the second request is a duplicate
assertThat(allocateResourceCalls.get(), is(0));
}
/**
* Tests that an already registered allocation id can be reused after the initial slot request
* has been freed.
*/
@Test
public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final AtomicInteger allocateResourceCalls = new AtomicInteger(0);
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
.setAllocateResourceConsumer(ignored -> allocateResourceCalls.incrementAndGet())
.build();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1.0, 2);
final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2.0, 1);
final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final ResourceID resourceID = ResourceID.generate();
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotID slotId = new SlotID(resourceID, 0);
final SlotStatus slotStatus = new SlotStatus(slotId, ResourceProfile.fromResources(2.0, 2));
final SlotReport slotReport = new SlotReport(slotStatus);
try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
slotManager.registerTaskManager(taskManagerConnection, slotReport);
assertTrue(slotManager.registerSlotRequest(slotRequest1));
TaskManagerSlot slot = slotManager.getSlot(slotId);
assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId());
slotManager.freeSlot(slotId, allocationId);
// check that the slot has been freed
assertTrue(slot.getState() == TaskManagerSlot.State.FREE);
assertNull(slot.getAllocationId());
assertTrue(slotManager.registerSlotRequest(slotRequest2));
assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId());
}
// check that we have only called the resource allocation only for the first slot request,
// since the second request is a duplicate
assertThat(allocateResourceCalls.get(), is(0));
}
/**
* Tests that the slot manager ignores slot reports of unknown origin (not registered
* task managers).
*/
@Test
public void testReceivingUnknownSlotReport() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
final InstanceID unknownInstanceID = new InstanceID();
final SlotID unknownSlotId = new SlotID(ResourceID.generate(), 0);
final ResourceProfile unknownResourceProfile = ResourceProfile.fromResources(1.0, 1);
final SlotStatus unknownSlotStatus = new SlotStatus(unknownSlotId, unknownResourceProfile);
final SlotReport unknownSlotReport = new SlotReport(unknownSlotStatus);
try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
// check that we don't have any slots registered
assertTrue(0 == slotManager.getNumberRegisteredSlots());
// this should not update anything since the instance id is not known to the slot manager
assertFalse(slotManager.reportSlotStatus(unknownInstanceID, unknownSlotReport));
assertTrue(0 == slotManager.getNumberRegisteredSlots());
}
}
/**
* Tests that slots are updated with respect to the latest incoming slot report. This means that
* slots for which a report was received are updated accordingly.
*/
@Test
public void testUpdateSlotReport() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
final TaskExecutorConnection taskManagerConnection = createTaskExecutorConnection();
final ResourceID resourceId = taskManagerConnection.getResourceID();
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
final ResourceProfile resourceProfile = ResourceProfile.fromResources(1.0, 1);
final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
final SlotStatus newSlotStatus2 = new SlotStatus(slotId2, resourceProfile, jobId, allocationId);
final SlotReport slotReport1 = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
final SlotReport slotReport2 = new SlotReport(Arrays.asList(newSlotStatus2, slotStatus1));
try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
// check that we don't have any slots registered
assertTrue(0 == slotManager.getNumberRegisteredSlots());
slotManager.registerTaskManager(taskManagerConnection, slotReport1);
TaskManagerSlot slot1 = slotManager.getSlot(slotId1);
TaskManagerSlot slot2 = slotManager.getSlot(slotId2);
assertTrue(2 == slotManager.getNumberRegisteredSlots());
assertTrue(slot1.getState() == TaskManagerSlot.State.FREE);
assertTrue(slot2.getState() == TaskManagerSlot.State.FREE);
assertTrue(slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), slotReport2));
assertTrue(2 == slotManager.getNumberRegisteredSlots());
assertNotNull(slotManager.getSlot(slotId1));
assertNotNull(slotManager.getSlot(slotId2));
// slotId2 should have been allocated for allocationId
assertEquals(allocationId, slotManager.getSlot(slotId2).getAllocationId());
}
}
/**
* Tests that slot requests time out after the specified request timeout. If a slot request
* times out, then the request is cancelled, removed from the slot manager and the resource
* manager is notified about the failed allocation.
*/
@Test
public void testSlotRequestTimeout() throws Exception {
final long allocationTimeout = 50L;
final CompletableFuture<Tuple2<JobID, AllocationID>> failedAllocationFuture = new CompletableFuture<>();
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
.setNotifyAllocationFailureConsumer(tuple3 -> failedAllocationFuture.complete(Tuple2.of(tuple3.f0, tuple3.f1)))
.build();
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile = ResourceProfile.fromResources(1.0, 1);
final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
final Executor mainThreadExecutor = TestingUtils.defaultExecutor();
try (SlotManager slotManager = createSlotManagerBuilder()
.setSlotRequestTimeout(Time.milliseconds(allocationTimeout))
.build()) {
slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
final AtomicReference<Exception> atomicException = new AtomicReference<>(null);
mainThreadExecutor.execute(() -> {
try {
assertTrue(slotManager.registerSlotRequest(slotRequest));
} catch (Exception e) {
atomicException.compareAndSet(null, e);
}
});
assertThat(failedAllocationFuture.get(), is(equalTo(Tuple2.of(jobId, allocationId))));
if (atomicException.get() != null) {
throw atomicException.get();
}
}
}
/**
* Tests that a slot request is retried if it times out on the task manager side.
*/
@Test
public void testTaskManagerSlotRequestTimeoutHandling() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile = ResourceProfile.fromResources(42.0, 1337);
final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
final CompletableFuture<Acknowledge> slotRequestFuture1 = new CompletableFuture<>();
final CompletableFuture<Acknowledge> slotRequestFuture2 = new CompletableFuture<>();
final Iterator<CompletableFuture<Acknowledge>> slotRequestFutureIterator = Arrays.asList(slotRequestFuture1, slotRequestFuture2).iterator();
final ArrayBlockingQueue<SlotID> slotIds = new ArrayBlockingQueue<>(2);
final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(FunctionUtils.uncheckedFunction(
requestSlotParameters -> {
slotIds.put(requestSlotParameters.f0);
return slotRequestFutureIterator.next();
}))
.createTestingTaskExecutorGateway();
final ResourceID resourceId = ResourceID.generate();
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
slotManager.registerTaskManager(taskManagerConnection, slotReport);
slotManager.registerSlotRequest(slotRequest);
final SlotID firstSlotId = slotIds.take();
assertThat(slotIds, is(empty()));
TaskManagerSlot failedSlot = slotManager.getSlot(firstSlotId);
// let the first attempt fail --> this should trigger a second attempt
slotRequestFuture1.completeExceptionally(new SlotAllocationException("Test exception."));
// the second attempt succeeds
slotRequestFuture2.complete(Acknowledge.get());
final SlotID secondSlotId = slotIds.take();
assertThat(slotIds, is(empty()));
TaskManagerSlot slot = slotManager.getSlot(secondSlotId);
assertTrue(slot.getState() == TaskManagerSlot.State.ALLOCATED);
assertEquals(allocationId, slot.getAllocationId());
if (!failedSlot.getSlotId().equals(slot.getSlotId())) {
assertTrue(failedSlot.getState() == TaskManagerSlot.State.FREE);
}
}
}
/**
* Tests that pending slot requests are rejected if a slot report with a different allocation
* is received.
*/
@Test
public void testSlotReportWhileActiveSlotRequest() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile = ResourceProfile.fromResources(42.0, 1337);
final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
final CompletableFuture<Acknowledge> slotRequestFuture1 = new CompletableFuture<>();
final Iterator<CompletableFuture<Acknowledge>> slotRequestFutureIterator = Arrays.asList(
slotRequestFuture1,
CompletableFuture.completedFuture(Acknowledge.get())).iterator();
final ArrayBlockingQueue<SlotID> slotIds = new ArrayBlockingQueue<>(2);
final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(FunctionUtils.uncheckedFunction(
requestSlotParameters -> {
slotIds.put(requestSlotParameters.f0);
return slotRequestFutureIterator.next();
}))
.createTestingTaskExecutorGateway();
final ResourceID resourceId = ResourceID.generate();
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
final ScheduledExecutor mainThreadExecutor = TestingUtils.defaultScheduledExecutor();
final SlotManagerImpl slotManager = createSlotManagerBuilder()
.setScheduledExecutor(mainThreadExecutor)
.build();
try {
slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
CompletableFuture<Void> registrationFuture = CompletableFuture.supplyAsync(
() -> {
slotManager.registerTaskManager(taskManagerConnection, slotReport);
return null;
},
mainThreadExecutor)
.thenAccept(
(Object value) -> {
try {
slotManager.registerSlotRequest(slotRequest);
} catch (ResourceManagerException e) {
throw new RuntimeException("Could not register slots.", e);
}
});
// check that no exception has been thrown
registrationFuture.get();
final SlotID requestedSlotId = slotIds.take();
final SlotID freeSlotId = requestedSlotId.equals(slotId1) ? slotId2 : slotId1;
final SlotStatus newSlotStatus1 = new SlotStatus(requestedSlotId, resourceProfile, new JobID(), new AllocationID());
final SlotStatus newSlotStatus2 = new SlotStatus(freeSlotId, resourceProfile);
final SlotReport newSlotReport = new SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2));
CompletableFuture<Boolean> reportSlotStatusFuture = CompletableFuture.supplyAsync(
// this should update the slot with the pending slot request triggering the reassignment of it
() -> slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport),
mainThreadExecutor);
assertTrue(reportSlotStatusFuture.get());
final SlotID requestedSlotId2 = slotIds.take();
assertEquals(freeSlotId, requestedSlotId2);
} finally {
CompletableFuture.runAsync(
ThrowingRunnable.unchecked(slotManager::close),
mainThreadExecutor);
}
}
/**
* Tests that formerly used task managers can again timeout after all of their slots have
* been freed.
*/
@Test
public void testTimeoutForUnusedTaskManager() throws Exception {
final long taskManagerTimeout = 50L;
final CompletableFuture<InstanceID> releasedResourceFuture = new CompletableFuture<>();
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
.setReleaseResourceConsumer((instanceID, e) -> releasedResourceFuture.complete(instanceID))
.build();
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor();
final ResourceID resourceId = ResourceID.generate();
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile = ResourceProfile.fromResources(1.0, 1);
final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
final CompletableFuture<SlotID> requestedSlotFuture = new CompletableFuture<>();
final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(tuple6 -> {
requestedSlotFuture.complete(tuple6.f0);
return CompletableFuture.completedFuture(Acknowledge.get());
})
.createTestingTaskExecutorGateway();
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
final SlotReport initialSlotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
final Executor mainThreadExecutor = TestingUtils.defaultExecutor();
try (final SlotManagerImpl slotManager = createSlotManagerBuilder()
.setTaskManagerTimeout(Time.of(taskManagerTimeout, TimeUnit.MILLISECONDS))
.build()) {
slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
CompletableFuture.supplyAsync(
() -> {
try {
return slotManager.registerSlotRequest(slotRequest);
} catch (ResourceManagerException e) {
throw new CompletionException(e);
}
},
mainThreadExecutor)
.thenRun(() -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
final SlotID slotId = requestedSlotFuture.get();
CompletableFuture<Boolean> idleFuture = CompletableFuture.supplyAsync(
() -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()),
mainThreadExecutor);
// check that the TaskManager is not idle
assertFalse(idleFuture.get());
CompletableFuture<TaskManagerSlot> slotFuture = CompletableFuture.supplyAsync(
() -> slotManager.getSlot(slotId),
mainThreadExecutor);
TaskManagerSlot slot = slotFuture.get();
assertTrue(slot.getState() == TaskManagerSlot.State.ALLOCATED);
assertEquals(allocationId, slot.getAllocationId());
CompletableFuture<Boolean> idleFuture2 = CompletableFuture.runAsync(
() -> slotManager.freeSlot(slotId, allocationId),
mainThreadExecutor)
.thenApply((Object value) -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()));
assertTrue(idleFuture2.get());
assertThat(releasedResourceFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
}
}
/**
* Tests that a task manager timeout does not remove the slots from the SlotManager.
* A timeout should only trigger the {@link ResourceActions#releaseResource(InstanceID, Exception)}
* callback. The receiver of the callback can then decide what to do with the TaskManager.
*
* <p>See FLINK-7793
*/
@Test
public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {
final Time taskManagerTimeout = Time.milliseconds(10L);
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceID resourceID = ResourceID.generate();
final CompletableFuture<InstanceID> releaseResourceFuture = new CompletableFuture<>();
final ResourceActions resourceActions = new TestingResourceActionsBuilder()
.setReleaseResourceConsumer((instanceId, ignored) -> releaseResourceFuture.complete(instanceId))
.build();
final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotStatus slotStatus = createEmptySlotStatus(new SlotID(resourceID, 0), ResourceProfile.fromResources(1.0, 1));
final SlotReport initialSlotReport = new SlotReport(slotStatus);
try (final SlotManager slotManager = createSlotManagerBuilder()
.setTaskManagerTimeout(taskManagerTimeout)
.buildAndStartWithDirectExec(resourceManagerId, resourceActions)) {
slotManager.registerTaskManager(taskExecutorConnection, initialSlotReport);
assertEquals(1, slotManager.getNumberRegisteredSlots());
// wait for the timeout call to happen
assertThat(releaseResourceFuture.get(), is(taskExecutorConnection.getInstanceID()));
assertEquals(1, slotManager.getNumberRegisteredSlots());
slotManager.unregisterTaskManager(taskExecutorConnection.getInstanceID(), TEST_EXCEPTION);
assertEquals(0, slotManager.getNumberRegisteredSlots());
}
}
/**
* Tests that free slots which are reported as allocated won't be considered for fulfilling
* other pending slot requests.
*
* <p>See: FLINK-8505
*/
@Test
public void testReportAllocatedSlot() throws Exception {
final ResourceID taskManagerId = ResourceID.generate();
final ResourceActions resourceActions = new TestingResourceActionsBuilder().build();
final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskManagerId, taskExecutorGateway);
try (final SlotManagerImpl slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) {
// initially report a single slot as free
final SlotID slotId = new SlotID(taskManagerId, 0);
final SlotStatus initialSlotStatus = new SlotStatus(
slotId,
ResourceProfile.ANY);
final SlotReport initialSlotReport = new SlotReport(initialSlotStatus);
slotManager.registerTaskManager(taskExecutorConnection, initialSlotReport);
assertThat(slotManager.getNumberRegisteredSlots(), is(equalTo(1)));
// Now report this slot as allocated
final SlotStatus slotStatus = new SlotStatus(
slotId,
ResourceProfile.ANY,
new JobID(),
new AllocationID());
final SlotReport slotReport = new SlotReport(
slotStatus);
slotManager.reportSlotStatus(
taskExecutorConnection.getInstanceID(),
slotReport);
// this slot request should not be fulfilled
final AllocationID allocationId = new AllocationID();
final SlotRequest slotRequest = new SlotRequest(
new JobID(),
allocationId,
ResourceProfile.UNKNOWN,
"foobar");
// This triggered an IllegalStateException before
slotManager.registerSlotRequest(slotRequest);
assertThat(slotManager.getSlotRequest(allocationId).isAssigned(), is(false));
}
}
/**
* Tests that the SlotManager retries allocating a slot if the TaskExecutor#requestSlot call
* fails.
*/
@Test
public void testSlotRequestFailure() throws Exception {
try (final SlotManagerImpl slotManager = createSlotManager(ResourceManagerId.generate(),
new TestingResourceActionsBuilder().build())) {
final SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
slotManager.registerSlotRequest(slotRequest);
final BlockingQueue<Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, ResourceManagerId>> requestSlotQueue = new ArrayBlockingQueue<>(1);
final BlockingQueue<CompletableFuture<Acknowledge>> responseQueue = new ArrayBlockingQueue<>(1);
final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple6 -> {
requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple6);
try {
return responseQueue.take();
} catch (InterruptedException ignored) {
return FutureUtils.completedExceptionally(new FlinkException("Response queue was interrupted."));
}
})
.createTestingTaskExecutorGateway();
final ResourceID taskExecutorResourceId = ResourceID.generate();
final TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, testingTaskExecutorGateway);
final SlotReport slotReport = new SlotReport(createEmptySlotStatus(new SlotID(taskExecutorResourceId, 0), ResourceProfile.ANY));
final CompletableFuture<Acknowledge> firstManualSlotRequestResponse = new CompletableFuture<>();
responseQueue.offer(firstManualSlotRequestResponse);
slotManager.registerTaskManager(taskExecutionConnection, slotReport);
final Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, ResourceManagerId> firstRequest = requestSlotQueue.take();
final CompletableFuture<Acknowledge> secondManualSlotRequestResponse = new CompletableFuture<>();
responseQueue.offer(secondManualSlotRequestResponse);
// fail first request
firstManualSlotRequestResponse.completeExceptionally(new SlotAllocationException("Test exception"));
final Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, ResourceManagerId> secondRequest = requestSlotQueue.take();
assertThat(secondRequest.f2, equalTo(firstRequest.f2));
assertThat(secondRequest.f0, equalTo(firstRequest.f0));
secondManualSlotRequestResponse.complete(Acknowledge.get());
final TaskManagerSlot slot = slotManager.getSlot(secondRequest.f0);
assertThat(slot.getState(), equalTo(TaskManagerSlot.State.ALLOCATED));
assertThat(slot.getAllocationId(), equalTo(secondRequest.f2));
}
}
/**
* Tests that pending request is removed if task executor reports a slot with its allocation id.
*/
@Test
public void testSlotRequestRemovedIfTMReportAllocation() throws Exception {
try (final SlotManagerImpl slotManager = createSlotManager(ResourceManagerId.generate(),
new TestingResourceActionsBuilder().build())) {
final JobID jobID = new JobID();
final SlotRequest slotRequest1 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
slotManager.registerSlotRequest(slotRequest1);
final BlockingQueue<Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, ResourceManagerId>> requestSlotQueue = new ArrayBlockingQueue<>(1);
final BlockingQueue<CompletableFuture<Acknowledge>> responseQueue = new ArrayBlockingQueue<>(1);
final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple6 -> {
requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple6);
try {
return responseQueue.take();
} catch (InterruptedException ignored) {
return FutureUtils.completedExceptionally(new FlinkException("Response queue was interrupted."));
}
})
.createTestingTaskExecutorGateway();
final ResourceID taskExecutorResourceId = ResourceID.generate();
final TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, testingTaskExecutorGateway);
final SlotReport slotReport = new SlotReport(createEmptySlotStatus(new SlotID(taskExecutorResourceId, 0), ResourceProfile.ANY));
final CompletableFuture<Acknowledge> firstManualSlotRequestResponse = new CompletableFuture<>();
responseQueue.offer(firstManualSlotRequestResponse);
slotManager.registerTaskManager(taskExecutionConnection, slotReport);
final Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, ResourceManagerId> firstRequest = requestSlotQueue.take();
final CompletableFuture<Acknowledge> secondManualSlotRequestResponse = new CompletableFuture<>();
responseQueue.offer(secondManualSlotRequestResponse);
final SlotRequest slotRequest2 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
slotManager.registerSlotRequest(slotRequest2);
// fail first request
firstManualSlotRequestResponse.completeExceptionally(new TimeoutException("Test exception to fail first allocation"));
final Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, ResourceManagerId> secondRequest = requestSlotQueue.take();
// fail second request
secondManualSlotRequestResponse.completeExceptionally(new SlotOccupiedException("Test exception", slotRequest1.getAllocationId(), jobID));
assertThat(firstRequest.f2, equalTo(slotRequest1.getAllocationId()));
assertThat(secondRequest.f2, equalTo(slotRequest2.getAllocationId()));
assertThat(secondRequest.f0, equalTo(firstRequest.f0));
secondManualSlotRequestResponse.complete(Acknowledge.get());
final TaskManagerSlot slot = slotManager.getSlot(secondRequest.f0);
assertThat(slot.getState(), equalTo(TaskManagerSlot.State.ALLOCATED));
assertThat(slot.getAllocationId(), equalTo(firstRequest.f2));
assertThat(slotManager.getNumberRegisteredSlots(), is(1));
}
}
/**
* Tests notify the job manager of the allocations when the task manager is failed/killed.
*/
@Test
public void testNotifyFailedAllocationWhenTaskManagerTerminated() throws Exception {
final Queue<Tuple2<JobID, AllocationID>> allocationFailures = new ArrayDeque<>(5);
final TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
.setNotifyAllocationFailureConsumer(
(Tuple3<JobID, AllocationID, Exception> failureMessage) ->
allocationFailures.offer(Tuple2.of(failureMessage.f0, failureMessage.f1)))
.build();
try (final SlotManager slotManager = createSlotManager(
ResourceManagerId.generate(),
resourceManagerActions)) {
// register slot request for job1.
JobID jobId1 = new JobID();
final SlotRequest slotRequest11 = createSlotRequest(jobId1);
final SlotRequest slotRequest12 = createSlotRequest(jobId1);
slotManager.registerSlotRequest(slotRequest11);
slotManager.registerSlotRequest(slotRequest12);
// create task-manager-1 with 2 slots.
final ResourceID taskExecutorResourceId1 = ResourceID.generate();
final TestingTaskExecutorGateway testingTaskExecutorGateway1 = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final TaskExecutorConnection taskExecutionConnection1 = new TaskExecutorConnection(taskExecutorResourceId1, testingTaskExecutorGateway1);
final SlotReport slotReport1 = createSlotReport(taskExecutorResourceId1, 2);
// register the task-manager-1 to the slot manager, this will trigger the slot allocation for job1.
slotManager.registerTaskManager(taskExecutionConnection1, slotReport1);
// register slot request for job2.
JobID jobId2 = new JobID();
final SlotRequest slotRequest21 = createSlotRequest(jobId2);
final SlotRequest slotRequest22 = createSlotRequest(jobId2);
slotManager.registerSlotRequest(slotRequest21);
slotManager.registerSlotRequest(slotRequest22);
// register slot request for job3.
JobID jobId3 = new JobID();
final SlotRequest slotRequest31 = createSlotRequest(jobId3);
slotManager.registerSlotRequest(slotRequest31);
// create task-manager-2 with 3 slots.
final ResourceID taskExecutorResourceId2 = ResourceID.generate();
final TestingTaskExecutorGateway testingTaskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final TaskExecutorConnection taskExecutionConnection2 = new TaskExecutorConnection(taskExecutorResourceId2, testingTaskExecutorGateway2);
final SlotReport slotReport2 = createSlotReport(taskExecutorResourceId2, 3);
// register the task-manager-2 to the slot manager, this will trigger the slot allocation for job2 and job3.
slotManager.registerTaskManager(taskExecutionConnection2, slotReport2);
// validate for job1.
slotManager.unregisterTaskManager(taskExecutionConnection1.getInstanceID(), TEST_EXCEPTION);
assertThat(allocationFailures, hasSize(2));
Tuple2<JobID, AllocationID> allocationFailure;
final Set<AllocationID> failedAllocations = new HashSet<>(2);
while ((allocationFailure = allocationFailures.poll()) != null) {
assertThat(allocationFailure.f0, equalTo(jobId1));
failedAllocations.add(allocationFailure.f1);
}
assertThat(failedAllocations, containsInAnyOrder(slotRequest11.getAllocationId(), slotRequest12.getAllocationId()));
// validate the result for job2 and job3.
slotManager.unregisterTaskManager(taskExecutionConnection2.getInstanceID(), TEST_EXCEPTION);
assertThat(allocationFailures, hasSize(3));
Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo = allocationFailures.stream().collect(Collectors.groupingBy(tuple -> tuple.f0));
assertThat(job2AndJob3FailedAllocationInfo.entrySet(), hasSize(2));
final Set<AllocationID> job2FailedAllocations = extractFailedAllocationsForJob(jobId2, job2AndJob3FailedAllocationInfo);
final Set<AllocationID> job3FailedAllocations = extractFailedAllocationsForJob(jobId3, job2AndJob3FailedAllocationInfo);
assertThat(job2FailedAllocations, containsInAnyOrder(slotRequest21.getAllocationId(), slotRequest22.getAllocationId()));
assertThat(job3FailedAllocations, containsInAnyOrder(slotRequest31.getAllocationId()));
}
}
private Set<AllocationID> extractFailedAllocationsForJob(JobID jobId2, Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo) {
return job2AndJob3FailedAllocationInfo.get(jobId2).stream().map(t -> t.f1).collect(Collectors.toSet());
}
@Nonnull
private SlotReport createSlotReport(ResourceID taskExecutorResourceId, int numberSlots) {
return createSlotReport(taskExecutorResourceId, numberSlots, ResourceProfile.ANY, SlotManagerImplTest::createEmptySlotStatus);
}
@Nonnull
private SlotReport createSlotReport(
ResourceID taskExecutorResourceId,
int numberSlots,
ResourceProfile resourceProfile,
BiFunction<SlotID, ResourceProfile, SlotStatus> slotStatusFactory) {
final Set<SlotStatus> slotStatusSet = new HashSet<>(numberSlots);
for (int i = 0; i < numberSlots; i++) {
slotStatusSet.add(slotStatusFactory.apply(new SlotID(taskExecutorResourceId, i), resourceProfile));
}
return new SlotReport(slotStatusSet);
}
private static SlotStatus createEmptySlotStatus(SlotID slotId, ResourceProfile resourceProfile) {
return new SlotStatus(slotId, resourceProfile);
}
@Nonnull
private SlotRequest createSlotRequest(JobID jobId) {
return createSlotRequest(jobId, ResourceProfile.UNKNOWN);
}
@Nonnull
private SlotRequest createSlotRequest(JobID jobId, ResourceProfile resourceProfile) {
return new SlotRequest(jobId, new AllocationID(), resourceProfile, "foobar1");
}
private SlotManagerImpl createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
return createSlotManager(resourceManagerId, resourceManagerActions, 1);
}
private SlotManagerImpl createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions, int numSlotsPerWorker) {
SlotManagerImpl slotManager = createSlotManagerBuilder()
.setNumSlotsPerWorker(numSlotsPerWorker)
.buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions);
return slotManager;
}
private SlotManagerBuilder createSlotManagerBuilder() {
return SlotManagerBuilder.newBuilder().setDefaultWorkerResourceSpec(WORKER_RESOURCE_SPEC);
}
/**
* Tests that we only request new resources/containers once we have assigned
* all pending task manager slots.
*/
@Test
public void testRequestNewResources() throws Exception {
final int numberSlots = 2;
final AtomicInteger resourceRequests = new AtomicInteger(0);
final TestingResourceActions testingResourceActions = new TestingResourceActionsBuilder()
.setAllocateResourceFunction(
ignored -> {
resourceRequests.incrementAndGet();
return true;
})
.build();
try (final SlotManagerImpl slotManager = createSlotManager(
ResourceManagerId.generate(),
testingResourceActions,
numberSlots)) {
final JobID jobId = new JobID();
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
assertThat(resourceRequests.get(), is(1));
// the second slot request should not try to allocate a new resource because the
// previous resource was started with 2 slots.
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
assertThat(resourceRequests.get(), is(1));
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(2));
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
assertThat(resourceRequests.get(), is(2));
}
}
/**
* Tests that a failing allocation/slot request will return the pending task manager slot.
*/
@Test
public void testFailingAllocationReturnsPendingTaskManagerSlot() throws Exception {
final int numberSlots = 2;
final TestingResourceActions resourceActions = new TestingResourceActionsBuilder().build();
try (final SlotManagerImpl slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions, numberSlots)) {
final JobID jobId = new JobID();
final SlotRequest slotRequest = createSlotRequest(jobId);
assertThat(slotManager.registerSlotRequest(slotRequest), is(true));
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1));
slotManager.unregisterSlotRequest(slotRequest.getAllocationId());
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(0));
}
}
/**
* Tests the completion of pending task manager slots by registering a TaskExecutor.
*/
@Test
public void testPendingTaskManagerSlotCompletion() throws Exception {
final int numberSlots = 3;
final TestingResourceActions resourceActions = new TestingResourceActionsBuilder().build();
final ResourceProfile resourceProfile =
SlotManagerImpl.generateDefaultSlotResourceProfile(WORKER_RESOURCE_SPEC, numberSlots);
try (final SlotManagerImpl slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions, numberSlots)) {
final JobID jobId = new JobID();
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId, resourceProfile)), is(true));
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1));
assertThat(slotManager.getNumberRegisteredSlots(), is(0));
final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection();
final SlotReport slotReport =
createSlotReport(taskExecutorConnection.getResourceID(), numberSlots - 1, resourceProfile, SlotManagerImplTest::createEmptySlotStatus);
slotManager.registerTaskManager(taskExecutorConnection, slotReport);
assertThat(slotManager.getNumberRegisteredSlots(), is(numberSlots - 1));
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(1));
}
}
private TaskExecutorConnection createTaskExecutorConnection() {
final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
return createTaskExecutorConnection(taskExecutorGateway);
}
private TaskExecutorConnection createTaskExecutorConnection(TaskExecutorGateway taskExecutorGateway) {
return new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway);
}
/**
* Tests that a different slot can fulfill a pending slot request. If the
* pending slot request has a pending task manager slot assigned, it should
* be freed.
*/
@Test
public void testRegistrationOfDifferentSlot() throws Exception {
final int numberSlots = 1;
final TestingResourceActions resourceActions = new TestingResourceActionsBuilder().build();
try (final SlotManagerImpl slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions, numberSlots)) {
final JobID jobId = new JobID();
final ResourceProfile requestedSlotProfile = ResourceProfile.fromResources(1.0, 1);
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId, requestedSlotProfile)), is(true));
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
final int numberOfferedSlots = 1;
final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection();
final ResourceProfile offeredSlotProfile = ResourceProfile.fromResources(2.0, 2);
final SlotReport slotReport = createSlotReport(
taskExecutorConnection.getResourceID(),
numberOfferedSlots,
offeredSlotProfile,
SlotManagerImplTest::createEmptySlotStatus);
slotManager.registerTaskManager(taskExecutorConnection, slotReport);
assertThat(slotManager.getNumberRegisteredSlots(), is(numberOfferedSlots));
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(0));
}
}
/**
* Tests that only free slots can fulfill/complete a pending task manager slot.
*/
@Test
public void testOnlyFreeSlotsCanFulfillPendingTaskManagerSlot() throws Exception {
final int numberSlots = 1;
final TestingResourceActions resourceActions = new TestingResourceActionsBuilder().build();
try (final SlotManagerImpl slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions, numberSlots)) {
final JobID jobId = new JobID();
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection();
final SlotID slotId = new SlotID(taskExecutorConnection.getResourceID(), 0);
final SlotStatus slotStatus = new SlotStatus(slotId, ResourceProfile.ANY, jobId, new AllocationID());
final SlotReport slotReport = new SlotReport(slotStatus);
slotManager.registerTaskManager(taskExecutorConnection, slotReport);
assertThat(slotManager.getNumberRegisteredSlots(), is(1));
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1));
}
}
/**
* Tests that the unregister cause is being forwarded when failing allocations.
*/
@Test
public void unregisterTaskManager_withAllocatedSlot_failsAllocationsWithCause() throws Exception {
CompletableFuture<Exception> allocationFailureCause = new CompletableFuture<>();
TestingResourceActions resourceActions = new TestingResourceActionsBuilder()
.setNotifyAllocationFailureConsumer(jobIDAllocationIDExceptionTuple3 -> allocationFailureCause.complete(jobIDAllocationIDExceptionTuple3.f2))
.build();
FlinkException failureCause = new FlinkException("unregisterTaskManager test exception.");
try (SlotManagerImpl slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) {
TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection();
SlotReport slotReport = createSingleAllocatedSlotReport(taskExecutorConnection.getResourceID(), new JobID());
slotManager.registerTaskManager(taskExecutorConnection, slotReport);
slotManager.unregisterTaskManager(taskExecutorConnection.getInstanceID(), failureCause);
assertThat(allocationFailureCause.get(), CoreMatchers.containsCause(failureCause));
}
}
private SlotReport createSingleAllocatedSlotReport(ResourceID resourceID, JobID jobId) {
return createSlotReport(
resourceID,
1,
ResourceProfile.ANY,
(slotId, resourceProfile) -> new SlotStatus(slotId, resourceProfile, jobId, new AllocationID()));
}
/**
* The spread out slot allocation strategy should spread out the allocated
* slots across all available TaskExecutors. See FLINK-12122.
*/
@Test
public void testSpreadOutSlotAllocationStrategy() throws Exception {
try (SlotManagerImpl slotManager = createSlotManagerBuilder()
.setSlotMatchingStrategy(LeastUtilizationSlotMatchingStrategy.INSTANCE)
.buildAndStartWithDirectExec(ResourceManagerId.generate(), new TestingResourceActionsBuilder().build())) {
final List<CompletableFuture<JobID>> requestSlotFutures = new ArrayList<>();
final int numberTaskExecutors = 5;
// register n TaskExecutors with 2 slots each
for (int i = 0; i < numberTaskExecutors; i++) {
final CompletableFuture<JobID> requestSlotFuture = new CompletableFuture<>();
requestSlotFutures.add(requestSlotFuture);
registerTaskExecutorWithTwoSlots(slotManager, requestSlotFuture);
}
final JobID jobId = new JobID();
// request n slots
for (int i = 0; i < numberTaskExecutors; i++) {
assertTrue(slotManager.registerSlotRequest(createSlotRequest(jobId)));
}
// check that every TaskExecutor has received a slot request
final Set<JobID> jobIds = new HashSet<>(FutureUtils.combineAll(requestSlotFutures).get(10L, TimeUnit.SECONDS));
assertThat(jobIds, hasSize(1));
assertThat(jobIds, containsInAnyOrder(jobId));
}
}
/**
* Test that the slot manager respect the max limitation of the number of slots when allocate new resource.
*/
@Test
public void testMaxSlotLimitAllocateResource() throws Exception {
final int numberSlots = 1;
final int maxSlotNum = 1;
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final JobID jobId = new JobID();
final AtomicInteger resourceRequests = new AtomicInteger(0);
ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
.setAllocateResourceFunction(
ignored -> {
resourceRequests.incrementAndGet();
return true;
})
.build();
try (SlotManagerImpl slotManager = createSlotManagerBuilder()
.setNumSlotsPerWorker(numberSlots)
.setMaxSlotNum(maxSlotNum)
.buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions)) {
assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(createSlotRequest(jobId)));
assertThat(resourceRequests.get(), is(1));
// The second slot request should not try to allocate a new resource because of the max limitation.
assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(createSlotRequest(jobId)));
assertThat(resourceRequests.get(), is(1));
}
}
/**
* Test that the slot manager release resource when the number of slots exceed max limit when new TaskExecutor registered.
*/
@Test
public void testMaxSlotLimitRegisterResource() throws Exception {
final int numberSlots = 1;
final int maxSlotNum = 1;
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final CompletableFuture<InstanceID> releasedResourceFuture = new CompletableFuture<>();
ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
.setReleaseResourceConsumer((instanceID, e) -> releasedResourceFuture.complete(instanceID))
.build();
final TaskExecutorGateway taskExecutorGateway1 = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final TaskExecutorGateway taskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final ResourceID resourceId1 = ResourceID.generate();
final ResourceID resourceId2 = ResourceID.generate();
final TaskExecutorConnection taskManagerConnection1 = new TaskExecutorConnection(resourceId1, taskExecutorGateway1);
final TaskExecutorConnection taskManagerConnection2 = new TaskExecutorConnection(resourceId2, taskExecutorGateway2);
final SlotID slotId1 = new SlotID(resourceId1, 0);
final SlotID slotId2 = new SlotID(resourceId1, 0);
final SlotStatus slotStatus1 = new SlotStatus(slotId1, ResourceProfile.UNKNOWN);
final SlotStatus slotStatus2 = new SlotStatus(slotId2, ResourceProfile.UNKNOWN);
final SlotReport slotReport1 = new SlotReport(Collections.singletonList(slotStatus1));
final SlotReport slotReport2 = new SlotReport(Collections.singletonList(slotStatus2));
try (SlotManagerImpl slotManager = createSlotManagerBuilder()
.setNumSlotsPerWorker(numberSlots)
.setMaxSlotNum(maxSlotNum)
.buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions)) {
slotManager.registerTaskManager(taskManagerConnection1, slotReport1);
slotManager.registerTaskManager(taskManagerConnection2, slotReport2);
assertThat("The number registered slots does not equal the expected number.", slotManager.getNumberRegisteredSlots(), is(1));
assertNotNull(slotManager.getSlot(slotId1));
// The second registered task manager should be released.
assertThat(releasedResourceFuture.get(), is(equalTo(taskManagerConnection2.getInstanceID())));
}
}
private void registerTaskExecutorWithTwoSlots(SlotManagerImpl slotManager, CompletableFuture<JobID> firstRequestSlotFuture) {
final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple6 -> {
firstRequestSlotFuture.complete(slotIDJobIDAllocationIDStringResourceManagerIdTuple6.f1);
return CompletableFuture.completedFuture(Acknowledge.get());
})
.createTestingTaskExecutorGateway();
final TaskExecutorConnection firstTaskExecutorConnection = createTaskExecutorConnection(taskExecutorGateway);
final SlotReport firstSlotReport = createSlotReport(firstTaskExecutorConnection.getResourceID(), 2);
slotManager.registerTaskManager(firstTaskExecutorConnection, firstSlotReport);
}
@Test
public void testGenerateDefaultSlotProfile() {
final int numSlots = 5;
final ResourceProfile resourceProfile = ResourceProfile.newBuilder()
.setCpuCores(1.0)
.setTaskHeapMemoryMB(1)
.setTaskOffHeapMemoryMB(2)
.setNetworkMemoryMB(3)
.setManagedMemoryMB(4)
.build();
final WorkerResourceSpec workerResourceSpec = new WorkerResourceSpec.Builder()
.setCpuCores(1.0 * numSlots)
.setTaskHeapMemoryMB(1 * numSlots)
.setTaskOffHeapMemoryMB(2 * numSlots)
.setNetworkMemoryMB(3 * numSlots)
.setManagedMemoryMB(4 * numSlots)
.build();
assertThat(
SlotManagerImpl.generateDefaultSlotResourceProfile(workerResourceSpec, numSlots),
is(resourceProfile));
}
}