[FLINK-15247][Runtime] Wait for all slots to be free before task executor services shutdown upon stopping
[FLINK-15247][Runtime] Improve test coverage for slot allocation and task submission in TaskSlotTable
[hotfix][Runtime] Introduce TaskSlotTable interface
[hotfix][Runtime][Tests] Introduce StubTaskSlotTable for tests to replace mockito mocks
[hotfix][Runtime][Tests] Rework TaskExecutorTest#testTaskInterruptionAndTerminationOnShutdown to testTaskSlotTableTerminationOnShutdown
This closes #10682.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index fe272ef..6de6747 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -139,7 +139,6 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -188,8 +187,6 @@
/** The kvState registration service in the task manager. */
private final KvStateService kvStateService;
- private final TaskCompletionTracker taskCompletionTracker;
-
// --------- job manager connections -----------
private final Map<ResourceID, JobManagerConnection> jobManagerConnections;
@@ -279,7 +276,6 @@
this.resourceManagerAddress = null;
this.resourceManagerConnection = null;
this.currentRegistrationTimeoutId = null;
- this.taskCompletionTracker = new TaskCompletionTracker();
final ResourceID resourceId = taskExecutorServices.getTaskManagerLocation().getResourceID();
this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices, resourceId);
@@ -330,7 +326,7 @@
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
// tell the task slot table who's responsible for the task slot actions
- taskSlotTable.start(new SlotActionsImpl());
+ taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
// start the job leader service
jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
@@ -376,7 +372,7 @@
return FutureUtils
.runAfterwards(
- taskCompletionTracker.failIncompleteTasksAndGetTerminationFuture(),
+ taskSlotTable.closeAsync(),
this::stopTaskExecutorServices)
.handle((ignored, throwable) -> {
handleOnStopException(throwableBeforeTasksCompletion, throwable);
@@ -606,7 +602,6 @@
if (taskAdded) {
task.startTaskThread();
- taskCompletionTracker.trackTaskCompletion(task);
setupResultPartitionBookkeeping(
tdd.getJobId(),
@@ -1856,30 +1851,4 @@
return new TaskExecutorHeartbeatPayload(taskSlotTable.createSlotReport(getResourceID()), partitionTracker.createClusterPartitionReport());
}
}
-
- private static class TaskCompletionTracker {
- private final Map<ExecutionAttemptID, Task> incompleteTasks;
-
- private TaskCompletionTracker() {
- incompleteTasks = new ConcurrentHashMap<>(8);
- }
-
- void trackTaskCompletion(Task task) {
- incompleteTasks.put(task.getExecutionId(), task);
- task.getTerminationFuture().thenRun(() -> incompleteTasks.remove(task.getExecutionId()));
- }
-
- CompletableFuture<Void> failIncompleteTasksAndGetTerminationFuture() {
- FlinkException cause = new FlinkException("The TaskExecutor is shutting down.");
- return FutureUtils.waitForAll(
- incompleteTasks
- .values()
- .stream()
- .map(task -> {
- task.failExternally(cause);
- return task.getTerminationFuture();
- })
- .collect(Collectors.toList()));
- }
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 1d140b2..45d6f2c 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -33,6 +33,7 @@
import org.apache.flink.runtime.shuffle.ShuffleServiceLoader;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -182,7 +183,7 @@
}
try {
- taskSlotTable.stop();
+ taskSlotTable.close();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
@@ -287,7 +288,7 @@
final TimerService<AllocationID> timerService = new TimerService<>(
new ScheduledThreadPoolExecutor(1),
timerServiceShutdownTimeout);
- return new TaskSlotTable<>(
+ return new TaskSlotTableImpl<>(
numberOfSlots,
TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(taskExecutorResourceSpec),
TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(taskExecutorResourceSpec, numberOfSlots),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
index 90e640d..779d6a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -22,8 +22,11 @@
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -33,6 +36,8 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
/**
* Container for multiple {@link TaskSlotPayload tasks} belonging to the same slot. A {@link TaskSlot} can be in one
@@ -55,7 +60,7 @@
*
* @param <T> type of the {@link TaskSlotPayload} stored in this slot
*/
-public class TaskSlot<T extends TaskSlotPayload> implements AutoCloseable {
+public class TaskSlot<T extends TaskSlotPayload> implements AutoCloseableAsync {
private static final Logger LOG = LoggerFactory.getLogger(TaskSlot.class);
/** Index of the task slot. */
@@ -78,6 +83,9 @@
/** Allocation id of this slot. */
private final AllocationID allocationId;
+ /** The closing future is completed when the slot is freed and closed. */
+ private final CompletableFuture<Void> closingFuture;
+
public TaskSlot(
final int index,
final ResourceProfile resourceProfile,
@@ -95,6 +103,8 @@
this.allocationId = allocationId;
this.memoryManager = createMemoryManager(resourceProfile, memoryPageSize);
+
+ this.closingFuture = new CompletableFuture<>();
}
// ----------------------------------------------------------------------------------
@@ -245,16 +255,6 @@
}
/**
- * Mark this slot as releasing. A slot can always be marked as releasing.
- *
- * @return True
- */
- public boolean markReleasing() {
- state = TaskSlotState.RELEASING;
- return true;
- }
-
- /**
* Generate the slot offer from this TaskSlot.
*
* @return The sot offer which this task slot can provide
@@ -274,9 +274,39 @@
}
@Override
- public void close() {
- verifyMemoryFreed();
- this.memoryManager.shutdown();
+ public CompletableFuture<Void> closeAsync() {
+ return closeAsync(new FlinkException("Closing the slot"));
+ }
+
+ /**
+ * Close the task slot asynchronously.
+ *
+ * <p>Slot is moved to {@link TaskSlotState#RELEASING} state and only once.
+ * If there are active tasks running in the slot then they are failed.
+ * The future of all tasks terminated and slot cleaned up is initiated only once and always returned
+ * in case of multiple attempts to close the slot.
+ *
+ * @param cause cause of closing
+ * @return future of all running task if any being done and slot cleaned up.
+ */
+ CompletableFuture<Void> closeAsync(Throwable cause) {
+ if (!isReleasing()) {
+ state = TaskSlotState.RELEASING;
+ if (!isEmpty()) {
+ // we couldn't free the task slot because it still contains task, fail the tasks
+ // and set the slot state to releasing so that it gets eventually freed
+ tasks.values().forEach(task -> task.failExternally(cause));
+ }
+ final CompletableFuture<Void> cleanupFuture = FutureUtils
+ .waitForAll(tasks.values().stream().map(TaskSlotPayload::getTerminationFuture).collect(Collectors.toList()))
+ .thenRun(() -> {
+ verifyMemoryFreed();
+ this.memoryManager.shutdown();
+ });
+
+ FutureUtils.forward(cleanupFuture, closingFuture);
+ }
+ return closingFuture;
}
private void verifyMemoryFreed() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 00b9691..4fe033c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -22,29 +22,17 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceBudgetManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.taskexecutor.SlotReport;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.util.AutoCloseableAsync;
import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
@@ -57,111 +45,14 @@
*
* <p>Before the task slot table can be used, it must be started via the {@link #start} method.
*/
-public class TaskSlotTable<T extends TaskSlotPayload> implements TimeoutListener<AllocationID> {
-
- private static final Logger LOG = LoggerFactory.getLogger(TaskSlotTable.class);
-
- /**
- * Number of slots in static slot allocation.
- * If slot is requested with an index, the requested index must within the range of [0, numberSlots).
- * When generating slot report, we should always generate slots with index in [0, numberSlots) even the slot does not exist.
- */
- private final int numberSlots;
-
- /** Slot resource profile for static slot allocation. */
- private final ResourceProfile defaultSlotResourceProfile;
-
- /** Page size for memory manager. */
- private final int memoryPageSize;
-
- /** Timer service used to time out allocated slots. */
- private final TimerService<AllocationID> timerService;
-
- /** The list of all task slots. */
- private final Map<Integer, TaskSlot<T>> taskSlots;
-
- /** Mapping from allocation id to task slot. */
- private final Map<AllocationID, TaskSlot<T>> allocatedSlots;
-
- /** Mapping from execution attempt id to task and task slot. */
- private final Map<ExecutionAttemptID, TaskSlotMapping<T>> taskSlotMappings;
-
- /** Mapping from job id to allocated slots for a job. */
- private final Map<JobID, Set<AllocationID>> slotsPerJob;
-
- /** Interface for slot actions, such as freeing them or timing them out. */
- private SlotActions slotActions;
-
- /** Whether the table has been started. */
- private volatile boolean started;
-
- private final ResourceBudgetManager budgetManager;
-
- public TaskSlotTable(
- final int numberSlots,
- final ResourceProfile totalAvailableResourceProfile,
- final ResourceProfile defaultSlotResourceProfile,
- final int memoryPageSize,
- final TimerService<AllocationID> timerService) {
-
- Preconditions.checkArgument(0 < numberSlots, "The number of task slots must be greater than 0.");
-
- this.numberSlots = numberSlots;
- this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile);
- this.memoryPageSize = memoryPageSize;
-
- this.taskSlots = new HashMap<>(numberSlots);
-
- this.timerService = Preconditions.checkNotNull(timerService);
-
- budgetManager = new ResourceBudgetManager(Preconditions.checkNotNull(totalAvailableResourceProfile));
-
- allocatedSlots = new HashMap<>(numberSlots);
-
- taskSlotMappings = new HashMap<>(4 * numberSlots);
-
- slotsPerJob = new HashMap<>(4);
-
- slotActions = null;
- started = false;
- }
-
+public interface TaskSlotTable<T extends TaskSlotPayload> extends TimeoutListener<AllocationID>, AutoCloseableAsync {
/**
* Start the task slot table with the given slot actions.
*
* @param initialSlotActions to use for slot actions
+ * @param mainThreadExecutor {@link ComponentMainThreadExecutor} to schedule internal calls to the main thread
*/
- public void start(SlotActions initialSlotActions) {
- this.slotActions = Preconditions.checkNotNull(initialSlotActions);
-
- timerService.start(this);
-
- started = true;
- }
-
- /**
- * Stop the task slot table.
- */
- public void stop() {
- started = false;
- timerService.stop();
- allocatedSlots
- .values()
- .stream()
- .filter(slot -> !taskSlots.containsKey(slot.getIndex()))
- .forEach(TaskSlot::close);
- allocatedSlots.clear();
- taskSlots.values().forEach(TaskSlot::close);
- taskSlots.clear();
- slotActions = null;
- }
-
- @VisibleForTesting
- public boolean isStopped() {
- return !started &&
- taskSlots.values().stream().allMatch(slot -> slot.getMemoryManager().isShutdown()) &&
- allocatedSlots.values().stream().allMatch(slot -> slot.getMemoryManager().isShutdown());
- }
+ void start(SlotActions initialSlotActions, ComponentMainThreadExecutor mainThreadExecutor);
/**
* Returns the all {@link AllocationID} for the given job.
@@ -169,65 +60,9 @@
* @param jobId for which to return the set of {@link AllocationID}.
* @return Set of {@link AllocationID} for the given job
*/
- public Set<AllocationID> getAllocationIdsPerJob(JobID jobId) {
- final Set<AllocationID> allocationIds = slotsPerJob.get(jobId);
+ Set<AllocationID> getAllocationIdsPerJob(JobID jobId);
- if (allocationIds == null) {
- return Collections.emptySet();
- } else {
- return Collections.unmodifiableSet(allocationIds);
- }
- }
-
- // ---------------------------------------------------------------------
- // Slot report methods
- // ---------------------------------------------------------------------
-
- public SlotReport createSlotReport(ResourceID resourceId) {
- List<SlotStatus> slotStatuses = new ArrayList<>();
-
- for (int i = 0; i < numberSlots; i++) {
- SlotID slotId = new SlotID(resourceId, i);
- SlotStatus slotStatus;
- if (taskSlots.containsKey(i)) {
- TaskSlot<T> taskSlot = taskSlots.get(i);
-
- slotStatus = new SlotStatus(
- slotId,
- taskSlot.getResourceProfile(),
- taskSlot.getJobId(),
- taskSlot.getAllocationId());
- } else {
- slotStatus = new SlotStatus(
- slotId,
- defaultSlotResourceProfile,
- null,
- null);
- }
-
- slotStatuses.add(slotStatus);
- }
-
- for (TaskSlot<T> taskSlot : allocatedSlots.values()) {
- if (taskSlot.getIndex() < 0) {
- SlotID slotID = SlotID.generateDynamicSlotID(resourceId);
- SlotStatus slotStatus = new SlotStatus(
- slotID,
- taskSlot.getResourceProfile(),
- taskSlot.getJobId(),
- taskSlot.getAllocationId());
- slotStatuses.add(slotStatus);
- }
- }
-
- final SlotReport slotReport = new SlotReport(slotStatuses);
-
- return slotReport;
- }
-
- // ---------------------------------------------------------------------
- // Slot methods
- // ---------------------------------------------------------------------
+ SlotReport createSlotReport(ResourceID resourceId);
/**
* Allocate the slot with the given index for the given job and allocation id. If negative index is
@@ -241,9 +76,7 @@
* @return True if the task slot could be allocated; otherwise false
*/
@VisibleForTesting
- public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
- return allocateSlot(index, jobId, allocationId, defaultSlotResourceProfile, slotTimeout);
- }
+ boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout);
/**
* Allocate the slot with the given index for the given job and allocation id. If negative index is
@@ -257,64 +90,12 @@
* @param slotTimeout until the slot times out
* @return True if the task slot could be allocated; otherwise false
*/
- public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile, Time slotTimeout) {
- checkInit();
-
- Preconditions.checkArgument(index < numberSlots);
-
- TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
- if (taskSlot != null) {
- LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot);
- return false;
- }
-
- if (taskSlots.containsKey(index)) {
- TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index);
- LOG.info("Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.",
- index,
- duplicatedTaskSlot.getResourceProfile(),
- duplicatedTaskSlot.getJobId(),
- duplicatedTaskSlot.getAllocationId());
- return duplicatedTaskSlot.getJobId().equals(jobId) &&
- duplicatedTaskSlot.getAllocationId().equals(allocationId);
- } else if (allocatedSlots.containsKey(allocationId)) {
- return true;
- }
-
- resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile;
-
- if (!budgetManager.reserve(resourceProfile)) {
- LOG.info("Cannot allocate the requested resources. Trying to allocate {}, "
- + "while the currently remaining available resources are {}, total is {}.",
- resourceProfile,
- budgetManager.getAvailableBudget(),
- budgetManager.getTotalBudget());
- return false;
- }
-
- taskSlot = new TaskSlot<>(index, resourceProfile, memoryPageSize, jobId, allocationId);
- if (index >= 0) {
- taskSlots.put(index, taskSlot);
- }
-
- // update the allocation id to task slot map
- allocatedSlots.put(allocationId, taskSlot);
-
- // register a timeout for this slot since it's in state allocated
- timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
-
- // add this slot to the set of job slots
- Set<AllocationID> slots = slotsPerJob.get(jobId);
-
- if (slots == null) {
- slots = new HashSet<>(4);
- slotsPerJob.put(jobId, slots);
- }
-
- slots.add(allocationId);
-
- return true;
- }
+ boolean allocateSlot(
+ int index,
+ JobID jobId,
+ AllocationID allocationId,
+ ResourceProfile resourceProfile,
+ Time slotTimeout);
/**
* Marks the slot under the given allocation id as active. If the slot could not be found, then
@@ -324,26 +105,7 @@
* @throws SlotNotFoundException if the slot could not be found for the given allocation id
* @return True if the slot could be marked active; otherwise false
*/
- public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException {
- checkInit();
-
- TaskSlot<T> taskSlot = getTaskSlot(allocationId);
-
- if (taskSlot != null) {
- if (taskSlot.markActive()) {
- // unregister a potential timeout
- LOG.info("Activate slot {}.", allocationId);
-
- timerService.unregisterTimeout(allocationId);
-
- return true;
- } else {
- return false;
- }
- } else {
- throw new SlotNotFoundException(allocationId);
- }
- }
+ boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException;
/**
* Marks the slot under the given allocation id as inactive. If the slot could not be found,
@@ -354,24 +116,7 @@
* @throws SlotNotFoundException if the slot could not be found for the given allocation id
* @return True if the slot could be marked inactive
*/
- public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) throws SlotNotFoundException {
- checkInit();
-
- TaskSlot<T> taskSlot = getTaskSlot(allocationId);
-
- if (taskSlot != null) {
- if (taskSlot.markInactive()) {
- // register a timeout to free the slot
- timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
-
- return true;
- } else {
- return false;
- }
- } else {
- throw new SlotNotFoundException(allocationId);
- }
- }
+ boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) throws SlotNotFoundException;
/**
* Try to free the slot. If the slot is empty it will set the state of the task slot to free
@@ -382,7 +127,7 @@
* @throws SlotNotFoundException if there is not task slot for the given allocation id
* @return Index of the freed slot if the slot could be freed; otherwise -1
*/
- public int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
+ default int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
return freeSlot(allocationId, new Exception("The task slot of this task is being freed."));
}
@@ -394,64 +139,9 @@
* @param allocationId identifying the task slot to be freed
* @param cause to fail the tasks with if slot is not empty
* @throws SlotNotFoundException if there is not task slot for the given allocation id
- * @return The freed TaskSlot. If the TaskSlot cannot be freed then null.
+ * @return Index of the freed slot if the slot could be freed; otherwise -1
*/
- public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
- checkInit();
-
- TaskSlot<T> taskSlot = getTaskSlot(allocationId);
-
- if (taskSlot != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Free slot {}.", taskSlot, cause);
- } else {
- LOG.info("Free slot {}.", taskSlot);
- }
-
- final JobID jobId = taskSlot.getJobId();
-
- if (taskSlot.isEmpty()) {
- // remove the allocation id to task slot mapping
- allocatedSlots.remove(allocationId);
-
- // unregister a potential timeout
- timerService.unregisterTimeout(allocationId);
-
- Set<AllocationID> slots = slotsPerJob.get(jobId);
-
- if (slots == null) {
- throw new IllegalStateException("There are no more slots allocated for the job " + jobId +
- ". This indicates a programming bug.");
- }
-
- slots.remove(allocationId);
-
- if (slots.isEmpty()) {
- slotsPerJob.remove(jobId);
- }
-
- taskSlot.close();
- taskSlots.remove(taskSlot.getIndex());
- budgetManager.release(taskSlot.getResourceProfile());
-
- return taskSlot.getIndex();
- } else {
- // we couldn't free the task slot because it still contains task, fail the tasks
- // and set the slot state to releasing so that it gets eventually freed
- taskSlot.markReleasing();
-
- Iterator<T> taskIterator = taskSlot.getTasks();
-
- while (taskIterator.hasNext()) {
- taskIterator.next().failExternally(cause);
- }
-
- return -1;
- }
- } else {
- throw new SlotNotFoundException(allocationId);
- }
- }
+ int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException;
/**
* Check whether the timeout with ticket is valid for the given allocation id.
@@ -460,11 +150,7 @@
* @param ticket of the timeout
* @return True if the timeout is valid; otherwise false
*/
- public boolean isValidTimeout(AllocationID allocationId, UUID ticket) {
- checkInit();
-
- return timerService.isValid(allocationId, ticket);
- }
+ boolean isValidTimeout(AllocationID allocationId, UUID ticket);
/**
* Check whether the slot for the given index is allocated for the given job and allocation id.
@@ -474,16 +160,7 @@
* @param allocationId which should match the task slot's allocation id
* @return True if the given task slot is allocated for the given job and allocation id
*/
- public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
- TaskSlot<T> taskSlot = taskSlots.get(index);
- if (taskSlot != null) {
- return taskSlot.isAllocated(jobId, allocationId);
- } else if (index < 0) {
- return allocatedSlots.containsKey(allocationId);
- } else {
- return false;
- }
- }
+ boolean isAllocated(int index, JobID jobId, AllocationID allocationId);
/**
* Try to mark the specified slot as active if it has been allocated by the given job.
@@ -492,15 +169,7 @@
* @param allocationId identifying the allocation
* @return True if the task slot could be marked active.
*/
- public boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) {
- TaskSlot<T> taskSlot = getTaskSlot(allocationId);
-
- if (taskSlot != null && taskSlot.isAllocated(jobId, allocationId)) {
- return taskSlot.markActive();
- } else {
- return false;
- }
- }
+ boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId);
/**
* Check whether the task slot with the given index is free.
@@ -508,9 +177,7 @@
* @param index of the task slot
* @return True if the task slot is free; otherwise false
*/
- public boolean isSlotFree(int index) {
- return !taskSlots.containsKey(index);
- }
+ boolean isSlotFree(int index);
/**
* Check whether the job has allocated (not active) slots.
@@ -518,9 +185,7 @@
* @param jobId for which to check for allocated slots
* @return True if there are allocated slots for the given job id.
*/
- public boolean hasAllocatedSlots(JobID jobId) {
- return getAllocatedSlots(jobId).hasNext();
- }
+ boolean hasAllocatedSlots(JobID jobId);
/**
* Return an iterator of allocated slots for the given job id.
@@ -528,9 +193,7 @@
* @param jobId for which to return the allocated slots
* @return Iterator of allocated slots.
*/
- public Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId) {
- return new TaskSlotIterator(jobId, TaskSlotState.ALLOCATED);
- }
+ Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId);
/**
* Return an iterator of active slots (their application ids) for the given job id.
@@ -538,9 +201,7 @@
* @param jobId for which to return the active slots
* @return Iterator of allocation ids of active slots
*/
- public Iterator<AllocationID> getActiveSlots(JobID jobId) {
- return new AllocationIDIterator(jobId, TaskSlotState.ACTIVE);
- }
+ Iterator<AllocationID> getActiveSlots(JobID jobId);
/**
* Returns the owning job of the {@link TaskSlot} identified by the
@@ -551,19 +212,7 @@
* the given allocation id or if the slot has no owning job assigned
*/
@Nullable
- public JobID getOwningJob(AllocationID allocationId) {
- final TaskSlot<T> taskSlot = getTaskSlot(allocationId);
-
- if (taskSlot != null) {
- return taskSlot.getJobId();
- } else {
- return null;
- }
- }
-
- // ---------------------------------------------------------------------
- // Task methods
- // ---------------------------------------------------------------------
+ JobID getOwningJob(AllocationID allocationId);
/**
* Add the given task to the slot identified by the task's allocation id.
@@ -573,27 +222,7 @@
* @throws SlotNotActiveException if there was no slot active for task's job and allocation id
* @return True if the task could be added to the task slot; otherwise false
*/
- public boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException {
- Preconditions.checkNotNull(task);
-
- TaskSlot<T> taskSlot = getTaskSlot(task.getAllocationId());
-
- if (taskSlot != null) {
- if (taskSlot.isActive(task.getJobID(), task.getAllocationId())) {
- if (taskSlot.add(task)) {
- taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping<>(task, taskSlot));
-
- return true;
- } else {
- return false;
- }
- } else {
- throw new SlotNotActiveException(task.getJobID(), task.getAllocationId());
- }
- } else {
- throw new SlotNotFoundException(task.getAllocationId());
- }
- }
+ boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException;
/**
* Remove the task with the given execution attempt id from its task slot. If the owning task
@@ -603,26 +232,7 @@
* @param executionAttemptID identifying the task to remove
* @return The removed task if there is any for the given execution attempt id; otherwise null
*/
- public T removeTask(ExecutionAttemptID executionAttemptID) {
- checkInit();
-
- TaskSlotMapping<T> taskSlotMapping = taskSlotMappings.remove(executionAttemptID);
-
- if (taskSlotMapping != null) {
- T task = taskSlotMapping.getTask();
- TaskSlot<T> taskSlot = taskSlotMapping.getTaskSlot();
-
- taskSlot.remove(task.getExecutionId());
-
- if (taskSlot.isReleasing() && taskSlot.isEmpty()) {
- slotActions.freeSlot(taskSlot.getAllocationId());
- }
-
- return task;
- } else {
- return null;
- }
- }
+ T removeTask(ExecutionAttemptID executionAttemptID);
/**
* Get the task for the given execution attempt id. If none could be found, then return null.
@@ -630,15 +240,7 @@
* @param executionAttemptID identifying the requested task
* @return The task for the given execution attempt id if it exist; otherwise null
*/
- public T getTask(ExecutionAttemptID executionAttemptID) {
- TaskSlotMapping<T> taskSlotMapping = taskSlotMappings.get(executionAttemptID);
-
- if (taskSlotMapping != null) {
- return taskSlotMapping.getTask();
- } else {
- return null;
- }
- }
+ T getTask(ExecutionAttemptID executionAttemptID);
/**
* Return an iterator over all tasks for a given job.
@@ -646,9 +248,7 @@
* @param jobId identifying the job of the requested tasks
* @return Iterator over all task for a given job
*/
- public Iterator<T> getTasks(JobID jobId) {
- return new TaskIterator(jobId);
- }
+ Iterator<T> getTasks(JobID jobId);
/**
* Get the current allocation for the task slot with the given index.
@@ -656,13 +256,7 @@
* @param index identifying the slot for which the allocation id shall be retrieved
* @return Allocation id of the specified slot if allocated; otherwise null
*/
- public AllocationID getCurrentAllocation(int index) {
- TaskSlot<T> taskSlot = taskSlots.get(index);
- if (taskSlot == null) {
- return null;
- }
- return taskSlot.getAllocationId();
- }
+ AllocationID getCurrentAllocation(int index);
/**
* Get the memory manager of the slot allocated for the task.
@@ -670,217 +264,5 @@
* @param allocationID allocation id of the slot allocated for the task
* @return the memory manager of the slot allocated for the task
*/
- public MemoryManager getTaskMemoryManager(AllocationID allocationID) throws SlotNotFoundException {
- TaskSlot<T> taskSlot = getTaskSlot(allocationID);
- if (taskSlot != null) {
- return taskSlot.getMemoryManager();
- } else {
- throw new SlotNotFoundException(allocationID);
- }
- }
-
- // ---------------------------------------------------------------------
- // TimeoutListener methods
- // ---------------------------------------------------------------------
-
- @Override
- public void notifyTimeout(AllocationID key, UUID ticket) {
- checkInit();
-
- if (slotActions != null) {
- slotActions.timeoutSlot(key, ticket);
- }
- }
-
- // ---------------------------------------------------------------------
- // Internal methods
- // ---------------------------------------------------------------------
-
- @Nullable
- private TaskSlot<T> getTaskSlot(AllocationID allocationId) {
- Preconditions.checkNotNull(allocationId);
-
- return allocatedSlots.get(allocationId);
- }
-
- private void checkInit() {
- Preconditions.checkState(started, "The %s has to be started.", TaskSlotTable.class.getSimpleName());
- }
-
- // ---------------------------------------------------------------------
- // Static utility classes
- // ---------------------------------------------------------------------
-
- /**
- * Mapping class between a {@link TaskSlotPayload} and a {@link TaskSlot}.
- */
- private static final class TaskSlotMapping<T extends TaskSlotPayload> {
- private final T task;
- private final TaskSlot<T> taskSlot;
-
- private TaskSlotMapping(T task, TaskSlot<T> taskSlot) {
- this.task = Preconditions.checkNotNull(task);
- this.taskSlot = Preconditions.checkNotNull(taskSlot);
- }
-
- public T getTask() {
- return task;
- }
-
- public TaskSlot<T> getTaskSlot() {
- return taskSlot;
- }
- }
-
- /**
- * Iterator over {@link AllocationID} of the {@link TaskSlot} of a given job. Additionally,
- * the task slots identified by the allocation ids are in the given state.
- */
- private final class AllocationIDIterator implements Iterator<AllocationID> {
- private final Iterator<TaskSlot<T>> iterator;
-
- private AllocationIDIterator(JobID jobId, TaskSlotState state) {
- iterator = new TaskSlotIterator(jobId, state);
- }
-
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override
- public AllocationID next() {
- try {
- return iterator.next().getAllocationId();
- } catch (NoSuchElementException e) {
- throw new NoSuchElementException("No more allocation ids.");
- }
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Cannot remove allocation ids via this iterator.");
- }
- }
-
- /**
- * Iterator over {@link TaskSlot} which fulfill a given state condition and belong to the given
- * job.
- */
- private final class TaskSlotIterator implements Iterator<TaskSlot<T>> {
- private final Iterator<AllocationID> allSlots;
- private final TaskSlotState state;
-
- private TaskSlot<T> currentSlot;
-
- private TaskSlotIterator(JobID jobId, TaskSlotState state) {
-
- Set<AllocationID> allocationIds = slotsPerJob.get(jobId);
-
- if (allocationIds == null || allocationIds.isEmpty()) {
- allSlots = Collections.emptyIterator();
- } else {
- allSlots = allocationIds.iterator();
- }
-
- this.state = Preconditions.checkNotNull(state);
-
- this.currentSlot = null;
- }
-
- @Override
- public boolean hasNext() {
- while (currentSlot == null && allSlots.hasNext()) {
- AllocationID tempSlot = allSlots.next();
-
- TaskSlot<T> taskSlot = getTaskSlot(tempSlot);
-
- if (taskSlot != null && taskSlot.getState() == state) {
- currentSlot = taskSlot;
- }
- }
-
- return currentSlot != null;
- }
-
- @Override
- public TaskSlot<T> next() {
- if (currentSlot != null) {
- TaskSlot<T> result = currentSlot;
-
- currentSlot = null;
-
- return result;
- } else {
- while (true) {
- AllocationID tempSlot;
-
- try {
- tempSlot = allSlots.next();
- } catch (NoSuchElementException e) {
- throw new NoSuchElementException("No more task slots.");
- }
-
- TaskSlot<T> taskSlot = getTaskSlot(tempSlot);
-
- if (taskSlot != null && taskSlot.getState() == state) {
- return taskSlot;
- }
- }
- }
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Cannot remove task slots via this iterator.");
- }
- }
-
- /**
- * Iterator over all {@link TaskSlotPayload} for a given job.
- */
- private final class TaskIterator implements Iterator<T> {
- private final Iterator<TaskSlot<T>> taskSlotIterator;
-
- private Iterator<T> currentTasks;
-
- private TaskIterator(JobID jobId) {
- this.taskSlotIterator = new TaskSlotIterator(jobId, TaskSlotState.ACTIVE);
-
- this.currentTasks = null;
- }
-
- @Override
- public boolean hasNext() {
- while ((currentTasks == null || !currentTasks.hasNext()) && taskSlotIterator.hasNext()) {
- TaskSlot<T> taskSlot = taskSlotIterator.next();
-
- currentTasks = taskSlot.getTasks();
- }
-
- return (currentTasks != null && currentTasks.hasNext());
- }
-
- @Override
- public T next() {
- while ((currentTasks == null || !currentTasks.hasNext())) {
- TaskSlot<T> taskSlot;
-
- try {
- taskSlot = taskSlotIterator.next();
- } catch (NoSuchElementException e) {
- throw new NoSuchElementException("No more tasks.");
- }
-
- currentTasks = taskSlot.getTasks();
- }
-
- return currentTasks.next();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Cannot remove tasks via this iterator.");
- }
- }
+ MemoryManager getTaskMemoryManager(AllocationID allocationID) throws SlotNotFoundException;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
new file mode 100644
index 0000000..873c1ec
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
@@ -0,0 +1,773 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceBudgetManager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor.DummyComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link TaskSlotTable}.
+ */
+public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTable<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskSlotTableImpl.class);
+
+ /**
+ * Number of slots in static slot allocation.
+ * If slot is requested with an index, the requested index must within the range of [0, numberSlots).
+ * When generating slot report, we should always generate slots with index in [0, numberSlots) even the slot does not exist.
+ */
+ private final int numberSlots;
+
+ /** Slot resource profile for static slot allocation. */
+ private final ResourceProfile defaultSlotResourceProfile;
+
+ /** Page size for memory manager. */
+ private final int memoryPageSize;
+
+ /** Timer service used to time out allocated slots. */
+ private final TimerService<AllocationID> timerService;
+
+ /** The list of all task slots. */
+ private final Map<Integer, TaskSlot<T>> taskSlots;
+
+ /** Mapping from allocation id to task slot. */
+ private final Map<AllocationID, TaskSlot<T>> allocatedSlots;
+
+ /** Mapping from execution attempt id to task and task slot. */
+ private final Map<ExecutionAttemptID, TaskSlotMapping<T>> taskSlotMappings;
+
+ /** Mapping from job id to allocated slots for a job. */
+ private final Map<JobID, Set<AllocationID>> slotsPerJob;
+
+ /** Interface for slot actions, such as freeing them or timing them out. */
+ @Nullable
+ private SlotActions slotActions;
+
+ /** The table state. */
+ private volatile State state;
+
+ private final ResourceBudgetManager budgetManager;
+
+ /** The closing future is completed when all slot are freed and state is closed. */
+ private final CompletableFuture<Void> closingFuture;
+
+ /** {@link ComponentMainThreadExecutor} to schedule internal calls to the main thread. */
+ private ComponentMainThreadExecutor mainThreadExecutor = new DummyComponentMainThreadExecutor(
+ "TaskSlotTableImpl is not initialized with proper main thread executor, " +
+ "call to TaskSlotTableImpl#start is required");
+
+ public TaskSlotTableImpl(
+ final int numberSlots,
+ final ResourceProfile totalAvailableResourceProfile,
+ final ResourceProfile defaultSlotResourceProfile,
+ final int memoryPageSize,
+ final TimerService<AllocationID> timerService) {
+ Preconditions.checkArgument(0 < numberSlots, "The number of task slots must be greater than 0.");
+
+ this.numberSlots = numberSlots;
+ this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile);
+ this.memoryPageSize = memoryPageSize;
+
+ this.taskSlots = new HashMap<>(numberSlots);
+
+ this.timerService = Preconditions.checkNotNull(timerService);
+
+ budgetManager = new ResourceBudgetManager(Preconditions.checkNotNull(totalAvailableResourceProfile));
+
+ allocatedSlots = new HashMap<>(numberSlots);
+
+ taskSlotMappings = new HashMap<>(4 * numberSlots);
+
+ slotsPerJob = new HashMap<>(4);
+
+ slotActions = null;
+ state = State.CREATED;
+ closingFuture = new CompletableFuture<>();
+ }
+
+ @Override
+ public void start(SlotActions initialSlotActions, ComponentMainThreadExecutor mainThreadExecutor) {
+ Preconditions.checkState(
+ state == State.CREATED,
+ "The %s has to be just created before starting",
+ TaskSlotTableImpl.class.getSimpleName());
+ this.slotActions = Preconditions.checkNotNull(initialSlotActions);
+ this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor);
+
+ timerService.start(this);
+
+ state = State.RUNNING;
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ if (state == State.CREATED) {
+ state = State.CLOSED;
+ closingFuture.complete(null);
+ } else if (state == State.RUNNING) {
+ state = State.CLOSING;
+ final FlinkException cause = new FlinkException("Closing task slot table");
+ CompletableFuture<Void> cleanupFuture = FutureUtils
+ .waitForAll(
+ new ArrayList<>(allocatedSlots.values())
+ .stream()
+ .map(slot -> freeSlotInternal(slot, cause))
+ .collect(Collectors.toList()))
+ .thenRunAsync(
+ () -> {
+ state = State.CLOSED;
+ timerService.stop();
+ },
+ mainThreadExecutor);
+ FutureUtils.forward(cleanupFuture, closingFuture);
+ }
+ return closingFuture;
+ }
+
+ @VisibleForTesting
+ public boolean isClosed() {
+ return state == State.CLOSED;
+ }
+
+ @Override
+ public Set<AllocationID> getAllocationIdsPerJob(JobID jobId) {
+ final Set<AllocationID> allocationIds = slotsPerJob.get(jobId);
+
+ if (allocationIds == null) {
+ return Collections.emptySet();
+ } else {
+ return Collections.unmodifiableSet(allocationIds);
+ }
+ }
+
+ // ---------------------------------------------------------------------
+ // Slot report methods
+ // ---------------------------------------------------------------------
+
+ @Override
+ public SlotReport createSlotReport(ResourceID resourceId) {
+ List<SlotStatus> slotStatuses = new ArrayList<>();
+
+ for (int i = 0; i < numberSlots; i++) {
+ SlotID slotId = new SlotID(resourceId, i);
+ SlotStatus slotStatus;
+ if (taskSlots.containsKey(i)) {
+ TaskSlot<T> taskSlot = taskSlots.get(i);
+
+ slotStatus = new SlotStatus(
+ slotId,
+ taskSlot.getResourceProfile(),
+ taskSlot.getJobId(),
+ taskSlot.getAllocationId());
+ } else {
+ slotStatus = new SlotStatus(
+ slotId,
+ defaultSlotResourceProfile,
+ null,
+ null);
+ }
+
+ slotStatuses.add(slotStatus);
+ }
+
+ for (TaskSlot<T> taskSlot : allocatedSlots.values()) {
+ if (taskSlot.getIndex() < 0) {
+ SlotID slotID = SlotID.generateDynamicSlotID(resourceId);
+ SlotStatus slotStatus = new SlotStatus(
+ slotID,
+ taskSlot.getResourceProfile(),
+ taskSlot.getJobId(),
+ taskSlot.getAllocationId());
+ slotStatuses.add(slotStatus);
+ }
+ }
+
+ final SlotReport slotReport = new SlotReport(slotStatuses);
+
+ return slotReport;
+ }
+
+ // ---------------------------------------------------------------------
+ // Slot methods
+ // ---------------------------------------------------------------------
+
+ @VisibleForTesting
+ @Override
+ public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
+ return allocateSlot(index, jobId, allocationId, defaultSlotResourceProfile, slotTimeout);
+ }
+
+ @Override
+ public boolean allocateSlot(
+ int index,
+ JobID jobId,
+ AllocationID allocationId,
+ ResourceProfile resourceProfile,
+ Time slotTimeout) {
+ checkRunning();
+
+ Preconditions.checkArgument(index < numberSlots);
+
+ TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
+ if (taskSlot != null) {
+ LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot);
+ return false;
+ }
+
+ if (taskSlots.containsKey(index)) {
+ TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index);
+ LOG.info("Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.",
+ index,
+ duplicatedTaskSlot.getResourceProfile(),
+ duplicatedTaskSlot.getJobId(),
+ duplicatedTaskSlot.getAllocationId());
+ return duplicatedTaskSlot.getJobId().equals(jobId) &&
+ duplicatedTaskSlot.getAllocationId().equals(allocationId);
+ } else if (allocatedSlots.containsKey(allocationId)) {
+ return true;
+ }
+
+ resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile;
+
+ if (!budgetManager.reserve(resourceProfile)) {
+ LOG.info("Cannot allocate the requested resources. Trying to allocate {}, "
+ + "while the currently remaining available resources are {}, total is {}.",
+ resourceProfile,
+ budgetManager.getAvailableBudget(),
+ budgetManager.getTotalBudget());
+ return false;
+ }
+
+ taskSlot = new TaskSlot<>(index, resourceProfile, memoryPageSize, jobId, allocationId);
+ if (index >= 0) {
+ taskSlots.put(index, taskSlot);
+ }
+
+ // update the allocation id to task slot map
+ allocatedSlots.put(allocationId, taskSlot);
+
+ // register a timeout for this slot since it's in state allocated
+ timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
+
+ // add this slot to the set of job slots
+ Set<AllocationID> slots = slotsPerJob.get(jobId);
+
+ if (slots == null) {
+ slots = new HashSet<>(4);
+ slotsPerJob.put(jobId, slots);
+ }
+
+ slots.add(allocationId);
+
+ return true;
+ }
+
+ @Override
+ public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException {
+ checkRunning();
+
+ TaskSlot<T> taskSlot = getTaskSlot(allocationId);
+
+ if (taskSlot != null) {
+ if (taskSlot.markActive()) {
+ // unregister a potential timeout
+ LOG.info("Activate slot {}.", allocationId);
+
+ timerService.unregisterTimeout(allocationId);
+
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ throw new SlotNotFoundException(allocationId);
+ }
+ }
+
+ @Override
+ public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) throws SlotNotFoundException {
+ checkStarted();
+
+ TaskSlot<T> taskSlot = getTaskSlot(allocationId);
+
+ if (taskSlot != null) {
+ if (taskSlot.markInactive()) {
+ // register a timeout to free the slot
+ timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
+
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ throw new SlotNotFoundException(allocationId);
+ }
+ }
+
+ @Override
+ public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
+ checkStarted();
+
+ TaskSlot<T> taskSlot = getTaskSlot(allocationId);
+
+ if (taskSlot != null) {
+ return freeSlotInternal(taskSlot, cause).isDone() ? taskSlot.getIndex() : -1;
+ } else {
+ throw new SlotNotFoundException(allocationId);
+ }
+ }
+
+ private CompletableFuture<Void> freeSlotInternal(TaskSlot<T> taskSlot, Throwable cause) {
+ AllocationID allocationId = taskSlot.getAllocationId();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Free slot {}.", taskSlot, cause);
+ } else {
+ LOG.info("Free slot {}.", taskSlot);
+ }
+
+ if (taskSlot.isEmpty()) {
+ // remove the allocation id to task slot mapping
+ allocatedSlots.remove(allocationId);
+
+ // unregister a potential timeout
+ timerService.unregisterTimeout(allocationId);
+
+ JobID jobId = taskSlot.getJobId();
+ Set<AllocationID> slots = slotsPerJob.get(jobId);
+
+ if (slots == null) {
+ throw new IllegalStateException("There are no more slots allocated for the job " + jobId +
+ ". This indicates a programming bug.");
+ }
+
+ slots.remove(allocationId);
+
+ if (slots.isEmpty()) {
+ slotsPerJob.remove(jobId);
+ }
+
+ taskSlots.remove(taskSlot.getIndex());
+ budgetManager.release(taskSlot.getResourceProfile());
+ }
+ return taskSlot.closeAsync(cause);
+ }
+
+ @Override
+ public boolean isValidTimeout(AllocationID allocationId, UUID ticket) {
+ checkStarted();
+
+ return state == State.RUNNING && timerService.isValid(allocationId, ticket);
+ }
+
+ @Override
+ public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
+ TaskSlot<T> taskSlot = taskSlots.get(index);
+ if (taskSlot != null) {
+ return taskSlot.isAllocated(jobId, allocationId);
+ } else if (index < 0) {
+ return allocatedSlots.containsKey(allocationId);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) {
+ TaskSlot<T> taskSlot = getTaskSlot(allocationId);
+
+ if (taskSlot != null && taskSlot.isAllocated(jobId, allocationId)) {
+ return taskSlot.markActive();
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isSlotFree(int index) {
+ return !taskSlots.containsKey(index);
+ }
+
+ @Override
+ public boolean hasAllocatedSlots(JobID jobId) {
+ return getAllocatedSlots(jobId).hasNext();
+ }
+
+ @Override
+ public Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId) {
+ return new TaskSlotIterator(jobId, TaskSlotState.ALLOCATED);
+ }
+
+ @Override
+ public Iterator<AllocationID> getActiveSlots(JobID jobId) {
+ return new AllocationIDIterator(jobId, TaskSlotState.ACTIVE);
+ }
+
+ @Override
+ @Nullable
+ public JobID getOwningJob(AllocationID allocationId) {
+ final TaskSlot<T> taskSlot = getTaskSlot(allocationId);
+
+ if (taskSlot != null) {
+ return taskSlot.getJobId();
+ } else {
+ return null;
+ }
+ }
+
+ // ---------------------------------------------------------------------
+ // Task methods
+ // ---------------------------------------------------------------------
+
+ @Override
+ public boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException {
+ checkRunning();
+ Preconditions.checkNotNull(task);
+
+ TaskSlot<T> taskSlot = getTaskSlot(task.getAllocationId());
+
+ if (taskSlot != null) {
+ if (taskSlot.isActive(task.getJobID(), task.getAllocationId())) {
+ if (taskSlot.add(task)) {
+ taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping<>(task, taskSlot));
+
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ throw new SlotNotActiveException(task.getJobID(), task.getAllocationId());
+ }
+ } else {
+ throw new SlotNotFoundException(task.getAllocationId());
+ }
+ }
+
+ @Override
+ public T removeTask(ExecutionAttemptID executionAttemptID) {
+ checkStarted();
+
+ TaskSlotMapping<T> taskSlotMapping = taskSlotMappings.remove(executionAttemptID);
+
+ if (taskSlotMapping != null) {
+ T task = taskSlotMapping.getTask();
+ TaskSlot<T> taskSlot = taskSlotMapping.getTaskSlot();
+
+ taskSlot.remove(task.getExecutionId());
+
+ if (taskSlot.isReleasing() && taskSlot.isEmpty()) {
+ slotActions.freeSlot(taskSlot.getAllocationId());
+ }
+
+ return task;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public T getTask(ExecutionAttemptID executionAttemptID) {
+ TaskSlotMapping<T> taskSlotMapping = taskSlotMappings.get(executionAttemptID);
+
+ if (taskSlotMapping != null) {
+ return taskSlotMapping.getTask();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Iterator<T> getTasks(JobID jobId) {
+ return new PayloadIterator(jobId);
+ }
+
+ @Override
+ public AllocationID getCurrentAllocation(int index) {
+ TaskSlot<T> taskSlot = taskSlots.get(index);
+ if (taskSlot == null) {
+ return null;
+ }
+ return taskSlot.getAllocationId();
+ }
+
+ @Override
+ public MemoryManager getTaskMemoryManager(AllocationID allocationID) throws SlotNotFoundException {
+ TaskSlot<T> taskSlot = getTaskSlot(allocationID);
+ if (taskSlot != null) {
+ return taskSlot.getMemoryManager();
+ } else {
+ throw new SlotNotFoundException(allocationID);
+ }
+ }
+
+ // ---------------------------------------------------------------------
+ // TimeoutListener methods
+ // ---------------------------------------------------------------------
+
+ @Override
+ public void notifyTimeout(AllocationID key, UUID ticket) {
+ checkStarted();
+
+ if (slotActions != null) {
+ slotActions.timeoutSlot(key, ticket);
+ }
+ }
+
+ // ---------------------------------------------------------------------
+ // Internal methods
+ // ---------------------------------------------------------------------
+
+ @Nullable
+ private TaskSlot<T> getTaskSlot(AllocationID allocationId) {
+ Preconditions.checkNotNull(allocationId);
+
+ return allocatedSlots.get(allocationId);
+ }
+
+ private void checkRunning() {
+ Preconditions.checkState(
+ state == State.RUNNING,
+ "The %s has to be running.", TaskSlotTableImpl.class.getSimpleName());
+ }
+
+ private void checkStarted() {
+ Preconditions.checkState(
+ state != State.CREATED,
+ "The %s has to be started (not created).", TaskSlotTableImpl.class.getSimpleName());
+ }
+
+ // ---------------------------------------------------------------------
+ // Static utility classes
+ // ---------------------------------------------------------------------
+
+ /**
+ * Mapping class between a {@link TaskSlotPayload} and a {@link TaskSlot}.
+ */
+ private static final class TaskSlotMapping<T extends TaskSlotPayload> {
+ private final T task;
+ private final TaskSlot<T> taskSlot;
+
+ private TaskSlotMapping(T task, TaskSlot<T> taskSlot) {
+ this.task = Preconditions.checkNotNull(task);
+ this.taskSlot = Preconditions.checkNotNull(taskSlot);
+ }
+
+ public T getTask() {
+ return task;
+ }
+
+ public TaskSlot<T> getTaskSlot() {
+ return taskSlot;
+ }
+ }
+
+ /**
+ * Iterator over {@link AllocationID} of the {@link TaskSlot} of a given job. Additionally,
+ * the task slots identified by the allocation ids are in the given state.
+ */
+ private final class AllocationIDIterator implements Iterator<AllocationID> {
+ private final Iterator<TaskSlot<T>> iterator;
+
+ private AllocationIDIterator(JobID jobId, TaskSlotState state) {
+ iterator = new TaskSlotIterator(jobId, state);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public AllocationID next() {
+ try {
+ return iterator.next().getAllocationId();
+ } catch (NoSuchElementException e) {
+ throw new NoSuchElementException("No more allocation ids.");
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Cannot remove allocation ids via this iterator.");
+ }
+ }
+
+ /**
+ * Iterator over {@link TaskSlot} which fulfill a given state condition and belong to the given
+ * job.
+ */
+ private final class TaskSlotIterator implements Iterator<TaskSlot<T>> {
+ private final Iterator<AllocationID> allSlots;
+ private final TaskSlotState state;
+
+ private TaskSlot<T> currentSlot;
+
+ private TaskSlotIterator(JobID jobId, TaskSlotState state) {
+
+ Set<AllocationID> allocationIds = slotsPerJob.get(jobId);
+
+ if (allocationIds == null || allocationIds.isEmpty()) {
+ allSlots = Collections.emptyIterator();
+ } else {
+ allSlots = allocationIds.iterator();
+ }
+
+ this.state = Preconditions.checkNotNull(state);
+
+ this.currentSlot = null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ while (currentSlot == null && allSlots.hasNext()) {
+ AllocationID tempSlot = allSlots.next();
+
+ TaskSlot<T> taskSlot = getTaskSlot(tempSlot);
+
+ if (taskSlot != null && taskSlot.getState() == state) {
+ currentSlot = taskSlot;
+ }
+ }
+
+ return currentSlot != null;
+ }
+
+ @Override
+ public TaskSlot<T> next() {
+ if (currentSlot != null) {
+ TaskSlot<T> result = currentSlot;
+
+ currentSlot = null;
+
+ return result;
+ } else {
+ while (true) {
+ AllocationID tempSlot;
+
+ try {
+ tempSlot = allSlots.next();
+ } catch (NoSuchElementException e) {
+ throw new NoSuchElementException("No more task slots.");
+ }
+
+ TaskSlot<T> taskSlot = getTaskSlot(tempSlot);
+
+ if (taskSlot != null && taskSlot.getState() == state) {
+ return taskSlot;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Cannot remove task slots via this iterator.");
+ }
+ }
+
+ /**
+ * Iterator over all {@link TaskSlotPayload} for a given job.
+ */
+ private final class PayloadIterator implements Iterator<T> {
+ private final Iterator<TaskSlot<T>> taskSlotIterator;
+
+ private Iterator<T> currentTasks;
+
+ private PayloadIterator(JobID jobId) {
+ this.taskSlotIterator = new TaskSlotIterator(jobId, TaskSlotState.ACTIVE);
+
+ this.currentTasks = null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ while ((currentTasks == null || !currentTasks.hasNext()) && taskSlotIterator.hasNext()) {
+ TaskSlot<T> taskSlot = taskSlotIterator.next();
+
+ currentTasks = taskSlot.getTasks();
+ }
+
+ return (currentTasks != null && currentTasks.hasNext());
+ }
+
+ @Override
+ public T next() {
+ while ((currentTasks == null || !currentTasks.hasNext())) {
+ TaskSlot<T> taskSlot;
+
+ try {
+ taskSlot = taskSlotIterator.next();
+ } catch (NoSuchElementException e) {
+ throw new NoSuchElementException("No more tasks.");
+ }
+
+ currentTasks = taskSlot.getTasks();
+ }
+
+ return currentTasks.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Cannot remove tasks via this iterator.");
+ }
+ }
+
+ private enum State {
+ CREATED,
+ RUNNING,
+ CLOSING,
+ CLOSED
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
index cdde9be..e82e9b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
@@ -33,17 +33,13 @@
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
-import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.messages.TaskBackPressureResponse;
-import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -55,10 +51,15 @@
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
@@ -141,7 +142,7 @@
final ExecutionAttemptID eid = new ExecutionAttemptID();
- final TaskDeploymentDescriptor tdd = createTestTaskDeploymentDescriptor("test task", eid, TaskExecutorTest.TestInvokable.class);
+ final TaskDeploymentDescriptor tdd = createTestTaskDeploymentDescriptor("test task", eid, FutureCompletingInvokable.class);
final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
@@ -819,4 +820,21 @@
producedPartitions,
inputGates);
}
+
+ /**
+ * Test invokable which completes the given future when executed.
+ */
+ public static class FutureCompletingInvokable extends AbstractInvokable {
+
+ static final CompletableFuture<Boolean> COMPLETABLE_FUTURE = new CompletableFuture<>();
+
+ public FutureCompletingInvokable(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ COMPLETABLE_FUTURE.complete(true);
+ }
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 423e401..4094be7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -43,8 +43,6 @@
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorBuilder;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.execution.librarycache.ContextClassLoaderLibraryCacheManager;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
@@ -57,7 +55,6 @@
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
-import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl;
@@ -67,7 +64,6 @@
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
@@ -98,14 +94,12 @@
import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
+import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
@@ -282,7 +276,7 @@
@Test
public void testShouldShutDownTaskManagerServicesInPostStop() throws Exception {
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
+ final TaskSlotTableImpl<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
@@ -316,7 +310,7 @@
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
- assertThat(taskSlotTable.isStopped(), is(true));
+ assertThat(taskSlotTable.isClosed(), is(true));
assertThat(nettyShuffleEnvironment.isClosed(), is(true));
assertThat(kvStateService.isShutdown(), is(true));
}
@@ -429,9 +423,11 @@
rpc.registerGateway(rmAddress, rmGateway);
- final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
- final SlotReport slotReport = new SlotReport();
- when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
+ final TaskSlotTable<Task> taskSlotTable = TestingTaskSlotTable
+ .<Task>newBuilder()
+ .createSlotReportSupplier(SlotReport::new)
+ .closeAsyncReturns(CompletableFuture.completedFuture(null))
+ .build();
HeartbeatServices heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
@@ -512,7 +508,12 @@
new JobID(),
new AllocationID()));
- final TestingTaskSlotTable taskSlotTable = new TestingTaskSlotTable(new ArrayDeque<>(Arrays.asList(slotReport1, slotReport2)));
+ final Queue<SlotReport> reports = new ArrayDeque<>(Arrays.asList(slotReport1, slotReport2));
+ final TaskSlotTable<Task> taskSlotTable = TestingTaskSlotTable
+ .<Task>newBuilder()
+ .createSlotReportSupplier(reports::poll)
+ .closeAsyncReturns(CompletableFuture.completedFuture(null))
+ .build();
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
@@ -632,9 +633,11 @@
rpc.registerGateway(address1, rmGateway1);
rpc.registerGateway(address2, rmGateway2);
- final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
- final SlotReport slotReport = new SlotReport();
- when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
+ final TaskSlotTable<Task> taskSlotTable = TestingTaskSlotTable
+ .<Task>newBuilder()
+ .createSlotReportSupplier(SlotReport::new)
+ .closeAsyncReturns(CompletableFuture.completedFuture(null))
+ .build();
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
@@ -680,123 +683,26 @@
}
}
- /**
- * Tests that we can submit a task to the TaskManager given that we've allocated a slot there.
- */
- @Test(timeout = 10000L)
- public void testTaskSubmission() throws Exception {
- final JobMasterId jobMasterId = JobMasterId.generate();
- final AllocationID allocationId = new AllocationID();
- final TaskDeploymentDescriptor taskDeploymentDescriptor = TaskDeploymentDescriptorBuilder
- .newBuilder(jobId, TestInvokable.class)
- .setAllocationId(allocationId)
- .build();
-
- final OneShotLatch taskInTerminalState = new OneShotLatch();
- final TaskManagerActions taskManagerActions = createTaskManagerActionsWithTerminalStateTrigger(taskInTerminalState);
- final JobManagerTable jobManagerTable = createJobManagerTableWithOneJob(jobMasterId, taskManagerActions);
- final TaskExecutor taskExecutor = createTaskExecutorWithJobManagerTable(jobManagerTable);
-
- try {
- taskExecutor.start();
-
- final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
- final JobMasterGateway jobMasterGateway = jobManagerTable.get(jobId).getJobManagerGateway();
- requestSlotFromTaskExecutor(taskExecutorGateway, jobMasterGateway, allocationId);
-
- taskExecutorGateway.submitTask(taskDeploymentDescriptor, jobMasterId, timeout);
-
- CompletableFuture<Boolean> completionFuture = TestInvokable.COMPLETABLE_FUTURE;
-
- completionFuture.get();
-
- taskInTerminalState.await();
- } finally {
- RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
- }
- }
-
- /**
- * Test invokable which completes the given future when executed.
- */
- public static class TestInvokable extends AbstractInvokable {
-
- static final CompletableFuture<Boolean> COMPLETABLE_FUTURE = new CompletableFuture<>();
-
- public TestInvokable(Environment environment) {
- super(environment);
- }
-
- @Override
- public void invoke() throws Exception {
- COMPLETABLE_FUTURE.complete(true);
- }
- }
-
@Test
- public void testTaskInterruptionAndTerminationOnShutdown() throws Exception {
- final JobMasterId jobMasterId = JobMasterId.generate();
- final AllocationID allocationId = new AllocationID();
- final TaskDeploymentDescriptor taskDeploymentDescriptor = TaskDeploymentDescriptorBuilder
- .newBuilder(jobId, TestInterruptableInvokable.class)
- .setAllocationId(allocationId)
- .build();
-
- final JobManagerTable jobManagerTable = createJobManagerTableWithOneJob(jobMasterId, new NoOpTaskManagerActions());
- final TaskExecutor taskExecutor = createTaskExecutorWithJobManagerTable(jobManagerTable);
-
+ public void testTaskSlotTableTerminationOnShutdown() throws Exception {
+ CompletableFuture<Void> taskSlotTableClosingFuture = new CompletableFuture<>();
+ TaskExecutorTestingContext submissionContext = createTaskExecutorTestingContext(
+ TestingTaskSlotTable.<Task>newBuilder().closeAsyncReturns(taskSlotTableClosingFuture).build());
+ final CompletableFuture<Void> taskExecutorTerminationFuture;
try {
- taskExecutor.start();
-
- final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
- final JobMasterGateway jobMasterGateway = jobManagerTable.get(jobId).getJobManagerGateway();
- requestSlotFromTaskExecutor(taskExecutorGateway, jobMasterGateway, allocationId);
-
- taskExecutorGateway.submitTask(taskDeploymentDescriptor, jobMasterId, timeout);
-
- TestInterruptableInvokable.STARTED_FUTURE.get();
+ submissionContext.start();
} finally {
- taskExecutor.closeAsync();
+ taskExecutorTerminationFuture = submissionContext.taskExecutor.closeAsync();
}
- // check task has been interrupted
- TestInterruptableInvokable.INTERRUPTED_FUTURE.get();
-
// check task executor is waiting for the task completion and has not terminated yet
- final CompletableFuture<Void> taskExecutorTerminationFuture = taskExecutor.getTerminationFuture();
assertThat(taskExecutorTerminationFuture.isDone(), is(false));
- // check task executor has exited after task completion
- TestInterruptableInvokable.DONE_FUTURE.complete(null);
+ // check task executor has exited after task slot table termination
+ taskSlotTableClosingFuture.complete(null);
taskExecutorTerminationFuture.get();
}
- private void requestSlotFromTaskExecutor(
- TaskExecutorGateway taskExecutorGateway,
- JobMasterGateway jobMasterGateway,
- AllocationID allocationId) throws ExecutionException, InterruptedException {
- final CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>> initialSlotReportFuture =
- new CompletableFuture<>();
- ResourceManagerId resourceManagerId = createAndRegisterResourceManager(initialSlotReportFuture);
- initialSlotReportFuture.get();
-
- taskExecutorGateway
- .requestSlot(
- new SlotID(ResourceID.generate(), 0),
- jobId,
- allocationId,
- ResourceProfile.ZERO,
- jobMasterGateway.getAddress(),
- resourceManagerId,
- timeout)
- .get();
-
- // now inform the task manager about the new job leader
- jobManagerLeaderRetriever.notifyListener(
- jobMasterGateway.getAddress(),
- jobMasterGateway.getFencingToken().toUUID());
- }
-
private ResourceManagerId createAndRegisterResourceManager(
CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>> initialSlotReportFuture) {
final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
@@ -814,52 +720,6 @@
return resourceManagerGateway.getFencingToken();
}
- private TaskExecutor createTaskExecutorWithJobManagerTable(JobManagerTable jobManagerTable) throws IOException {
- final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
- return createTaskExecutor(new TaskManagerServicesBuilder()
- .setTaskSlotTable(TaskSlotUtils.createTaskSlotTable(1))
- .setJobManagerTable(jobManagerTable)
- .setTaskStateManager(localStateStoresManager)
- .build());
- }
-
- private JobManagerTable createJobManagerTableWithOneJob(
- JobMasterId jobMasterId,
- TaskManagerActions taskManagerActions) {
- final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder()
- .setFencingTokenSupplier(() -> jobMasterId)
- .setOfferSlotsFunction((resourceID, slotOffers) -> CompletableFuture.completedFuture(slotOffers))
- .build();
- rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
-
- final JobManagerConnection jobManagerConnection = new JobManagerConnection(
- jobId,
- ResourceID.generate(),
- jobMasterGateway,
- taskManagerActions,
- new TestCheckpointResponder(),
- new TestGlobalAggregateManager(),
- ContextClassLoaderLibraryCacheManager.INSTANCE,
- new NoOpResultPartitionConsumableNotifier(),
- (j, i, r) -> CompletableFuture.completedFuture(null));
-
- final JobManagerTable jobManagerTable = new JobManagerTable();
- jobManagerTable.put(jobId, jobManagerConnection);
- return jobManagerTable;
- }
-
- private static TaskManagerActions createTaskManagerActionsWithTerminalStateTrigger(
- final OneShotLatch taskInTerminalState) {
- return new NoOpTaskManagerActions() {
- @Override
- public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
- if (taskExecutionState.getExecutionState().isTerminal()) {
- taskInTerminalState.trigger();
- }
- }
- };
- }
-
/**
* Tests that a TaskManager detects a job leader for which it has reserved slots. Upon detecting
* the job leader, it will offer all reserved slots to the JobManager.
@@ -1963,23 +1823,9 @@
@Test
public void testDynamicSlotAllocation() throws Exception {
- final JobMasterId jobMasterId = JobMasterId.generate();
final AllocationID allocationId = new AllocationID();
-
- final OneShotLatch taskInTerminalState = new OneShotLatch();
- final TaskManagerActions taskManagerActions = createTaskManagerActionsWithTerminalStateTrigger(taskInTerminalState);
- final JobManagerTable jobManagerTable = createJobManagerTableWithOneJob(jobMasterId, taskManagerActions);
- final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
- final TaskExecutor taskExecutor = createTaskExecutor(new TaskManagerServicesBuilder()
- .setTaskSlotTable(taskSlotTable)
- .setJobManagerTable(jobManagerTable)
- .build());
-
- try {
- taskExecutor.start();
-
- final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
- final JobMasterGateway jobMasterGateway = jobManagerTable.get(jobId).getJobManagerGateway();
+ try (TaskExecutorTestingContext submissionContext = createTaskExecutorTestingContext(2)) {
+ submissionContext.start();
final CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>> initialSlotReportFuture =
new CompletableFuture<>();
ResourceManagerId resourceManagerId = createAndRegisterResourceManager(initialSlotReportFuture);
@@ -1987,25 +1833,24 @@
final ResourceProfile resourceProfile = DEFAULT_RESOURCE_PROFILE
.merge(ResourceProfile.newBuilder().setCpuCores(0.1).build());
- taskExecutorGateway
+ submissionContext.taskExecutor
+ .getSelfGateway(TaskExecutorGateway.class)
.requestSlot(
SlotID.generateDynamicSlotID(ResourceID.generate()),
jobId,
allocationId,
resourceProfile,
- jobMasterGateway.getAddress(),
+ submissionContext.jobMasterGateway.getAddress(),
resourceManagerId,
timeout)
.get();
ResourceID resourceId = ResourceID.generate();
- SlotReport slotReport = taskSlotTable.createSlotReport(resourceId);
+ SlotReport slotReport = submissionContext.taskSlotTable.createSlotReport(resourceId);
assertThat(slotReport, containsInAnyOrder(
- new SlotStatus(new SlotID(resourceId, 0), DEFAULT_RESOURCE_PROFILE),
- new SlotStatus(new SlotID(resourceId, 1), DEFAULT_RESOURCE_PROFILE),
- new SlotStatus(SlotID.generateDynamicSlotID(resourceId), resourceProfile, jobId, allocationId)));
- } finally {
- RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+ new SlotStatus(new SlotID(resourceId, 0), DEFAULT_RESOURCE_PROFILE),
+ new SlotStatus(new SlotID(resourceId, 1), DEFAULT_RESOURCE_PROFILE),
+ new SlotStatus(SlotID.generateDynamicSlotID(resourceId), resourceProfile, jobId, allocationId)));
}
}
@@ -2070,6 +1915,95 @@
TaskManagerRunner.createBackPressureSampleService(configuration, rpc.getScheduledExecutor()));
}
+ private TaskExecutorTestingContext createTaskExecutorTestingContext(int numberOfSlots) throws IOException {
+ return createTaskExecutorTestingContext(TaskSlotUtils.createTaskSlotTable(numberOfSlots));
+ }
+
+ private TaskExecutorTestingContext createTaskExecutorTestingContext(final TaskSlotTable<Task> taskSlotTable) throws IOException {
+ final OneShotLatch offerSlotsLatch = new OneShotLatch();
+ final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder()
+ .setOfferSlotsFunction((resourceID, slotOffers) -> {
+ offerSlotsLatch.trigger();
+ return CompletableFuture.completedFuture(slotOffers);
+ }).build();
+ rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
+
+ final JobLeaderService jobLeaderService = new JobLeaderService(
+ taskManagerLocation,
+ RetryingRegistrationConfiguration.defaultConfiguration());
+
+ TaskExecutorLocalStateStoresManager stateStoresManager = createTaskExecutorLocalStateStoresManager();
+ final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(new TaskManagerServicesBuilder()
+ .setTaskSlotTable(taskSlotTable)
+ .setJobLeaderService(jobLeaderService)
+ .setTaskStateManager(stateStoresManager)
+ .build());
+
+ jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
+ return new TaskExecutorTestingContext(offerSlotsLatch, jobMasterGateway, jobLeaderService, taskSlotTable, taskExecutor);
+ }
+
+ private class TaskExecutorTestingContext implements AutoCloseable {
+ private final OneShotLatch offerSlotsLatch;
+ private final TestingJobMasterGateway jobMasterGateway;
+ private final JobLeaderService jobLeaderService;
+ private final TaskSlotTable taskSlotTable;
+ private final TestingTaskExecutor taskExecutor;
+
+ private TaskExecutorTestingContext(
+ OneShotLatch offerSlotsLatch,
+ TestingJobMasterGateway jobMasterGateway,
+ JobLeaderService jobLeaderService,
+ TaskSlotTable taskSlotTable,
+ TestingTaskExecutor taskExecutor) {
+ this.offerSlotsLatch = offerSlotsLatch;
+ this.jobMasterGateway = jobMasterGateway;
+ this.jobLeaderService = jobLeaderService;
+ this.taskSlotTable = taskSlotTable;
+ this.taskExecutor = taskExecutor;
+ }
+
+ private void start() {
+ taskExecutor.start();
+ taskExecutor.waitUntilStarted();
+ }
+
+ private void startAllocateSlotAndSubmit(
+ final Class<? extends AbstractInvokable> task) throws Exception {
+ final AllocationID allocationId = new AllocationID();
+
+ start();
+
+ taskSlotTable.allocateSlot(0, jobId, allocationId, Time.milliseconds(10000L));
+
+ // we have to add the job after the TaskExecutor, because otherwise the service has not
+ // been properly started.
+ jobLeaderService.addJob(jobId, jobMasterGateway.getAddress());
+ offerSlotsLatch.await();
+
+ taskExecutor
+ .getSelfGateway(TaskExecutorGateway.class)
+ .submitTask(
+ createTaskDeploymentDescriptor(allocationId, task),
+ jobMasterGateway.getFencingToken(),
+ timeout);
+ }
+
+ private TaskDeploymentDescriptor createTaskDeploymentDescriptor(
+ final AllocationID allocationId,
+ final Class<? extends AbstractInvokable> task) throws IOException {
+ return TaskDeploymentDescriptorBuilder
+ .newBuilder(jobId, task)
+ .setAllocationId(allocationId)
+ .build();
+ }
+
+ @Override
+ public void close() throws ExecutionException, InterruptedException, TimeoutException {
+ RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+ }
+ }
+
private static final class StartStopNotifyingLeaderRetrievalService implements LeaderRetrievalService {
private final CompletableFuture<LeaderRetrievalListener> startFuture;
@@ -2165,21 +2099,7 @@
}
}
- private static final class TestingTaskSlotTable extends TaskSlotTable<Task> {
- private final Queue<SlotReport> slotReports;
-
- private TestingTaskSlotTable(Queue<SlotReport> slotReports) {
- super(1, createTotalResourceProfile(1), DEFAULT_RESOURCE_PROFILE, MemoryManager.MIN_PAGE_SIZE, createDefaultTimerService(timeout.toMilliseconds()));
- this.slotReports = slotReports;
- }
-
- @Override
- public SlotReport createSlotReport(ResourceID resourceId) {
- return slotReports.poll();
- }
- }
-
- private static final class AllocateSlotNotifyingTaskSlotTable extends TaskSlotTable<Task> {
+ private static final class AllocateSlotNotifyingTaskSlotTable extends TaskSlotTableImpl<Task> {
private final OneShotLatch allocateSlotLatch;
@@ -2205,7 +2125,7 @@
}
}
- private static final class ActivateSlotNotifyingTaskSlotTable extends TaskSlotTable<Task> {
+ private static final class ActivateSlotNotifyingTaskSlotTable extends TaskSlotTableImpl<Task> {
private final CountDownLatch slotsToActivate;
@@ -2225,36 +2145,4 @@
return result;
}
}
-
- /**
- * Test invokable which completes the given future when interrupted (can be used only once).
- */
- public static class TestInterruptableInvokable extends AbstractInvokable {
- private static final CompletableFuture<Void> INTERRUPTED_FUTURE = new CompletableFuture<>();
- private static final CompletableFuture<Void> STARTED_FUTURE = new CompletableFuture<>();
- private static final CompletableFuture<Void> DONE_FUTURE = new CompletableFuture<>();
-
- public TestInterruptableInvokable(Environment environment) {
- super(environment);
- }
-
- @Override
- public void invoke() {
- STARTED_FUTURE.complete(null);
-
- try {
- INTERRUPTED_FUTURE.get();
- } catch (InterruptedException e) {
- INTERRUPTED_FUTURE.complete(null);
- } catch (ExecutionException e) {
- ExceptionUtils.rethrow(e);
- }
-
- try {
- DONE_FUTURE.get();
- } catch (ExecutionException | InterruptedException e) {
- ExceptionUtils.rethrow(e);
- }
- }
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
index 8b348e4..c494ffd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
@@ -26,11 +26,14 @@
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
+import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import java.util.concurrent.CompletableFuture;
+
import static org.mockito.Mockito.mock;
/**
@@ -57,7 +60,7 @@
kvStateService = new KvStateService(new KvStateRegistry(), null, null);
broadcastVariableManager = new BroadcastVariableManager();
taskEventDispatcher = new TaskEventDispatcher();
- taskSlotTable = (TaskSlotTable<Task>) mock(TaskSlotTable.class);
+ taskSlotTable = TestingTaskSlotTable.<Task>newBuilder().closeAsyncReturns(CompletableFuture.completedFuture(null)).build();
jobManagerTable = new JobManagerTable();
jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
taskStateManager = mock(TaskExecutorLocalStateStoresManager.class);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 1661efb..6d47155 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -51,6 +51,7 @@
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -76,7 +77,6 @@
import java.util.concurrent.CompletableFuture;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -119,14 +119,16 @@
this.jobMasterId = jobMasterId;
- if (slotSize > 0) {
- this.taskSlotTable = TaskSlotUtils.createTaskSlotTable(slotSize);
- } else {
- //noinspection unchecked
- this.taskSlotTable = (TaskSlotTable<Task>) mock(TaskSlotTable.class);
- when(taskSlotTable.tryMarkSlotActive(eq(jobId), any())).thenReturn(true);
- when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);
- }
+ this.taskSlotTable = slotSize > 0 ?
+ TaskSlotUtils.createTaskSlotTable(slotSize) :
+ TestingTaskSlotTable
+ .<Task>newBuilder()
+ .tryMarkSlotActiveReturns(true)
+ .addTaskReturns(true)
+ .closeAsyncReturns(CompletableFuture.completedFuture(null))
+ .allocateSlotReturns(true)
+ .memoryManagerGetterReturns(null)
+ .build();
JobMasterGateway jobMasterGateway;
if (testingJobMasterGateway == null) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
index 55a71d8..098c367 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
@@ -24,6 +24,8 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.util.TestLogger;
@@ -38,6 +40,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
@@ -48,19 +51,16 @@
* Tests for the {@link TaskSlotTable}.
*/
public class TaskSlotTableTest extends TestLogger {
-
private static final Time SLOT_TIMEOUT = Time.seconds(100L);
/**
* Tests that one can can mark allocated slots as active.
*/
@Test
- public void testTryMarkSlotActive() throws SlotNotFoundException {
- final TaskSlotTable<?> taskSlotTable = TaskSlotUtils.createTaskSlotTable(3);
+ public void testTryMarkSlotActive() throws Exception {
+ final TaskSlotTableImpl<?> taskSlotTable = createTaskSlotTableAndStart(3);
try {
- taskSlotTable.start(new TestingSlotActionsBuilder().build());
-
final JobID jobId1 = new JobID();
final AllocationID allocationId1 = new AllocationID();
taskSlotTable.allocateSlot(0, jobId1, allocationId1, SLOT_TIMEOUT);
@@ -84,8 +84,8 @@
assertThat(Sets.newHashSet(taskSlotTable.getActiveSlots(jobId1)), is(equalTo(new HashSet<>(Arrays.asList(allocationId2, allocationId1)))));
} finally {
- taskSlotTable.stop();
- assertThat(taskSlotTable.isStopped(), is(true));
+ taskSlotTable.close();
+ assertThat(taskSlotTable.isClosed(), is(true));
}
}
@@ -93,12 +93,8 @@
* Tests that redundant slot allocation with the same AllocationID to a different slot is rejected.
*/
@Test
- public void testRedundantSlotAllocation() {
- final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
-
- try {
- taskSlotTable.start(new TestingSlotActionsBuilder().build());
-
+ public void testRedundantSlotAllocation() throws Exception {
+ try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(2)) {
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
@@ -111,18 +107,12 @@
Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
assertThat(allocatedSlots.next().getIndex(), is(0));
assertThat(allocatedSlots.hasNext(), is(false));
- } finally {
- taskSlotTable.stop();
}
}
@Test
- public void testFreeSlot() throws SlotNotFoundException {
- final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
-
- try {
- taskSlotTable.start(new TestingSlotActionsBuilder().build());
-
+ public void testFreeSlot() throws Exception {
+ try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(2)) {
final JobID jobId = new JobID();
final AllocationID allocationId1 = new AllocationID();
final AllocationID allocationId2 = new AllocationID();
@@ -138,18 +128,12 @@
assertThat(taskSlotTable.isAllocated(1, jobId, allocationId1), is(false));
assertThat(taskSlotTable.isAllocated(1, jobId, allocationId2), is(false));
assertThat(taskSlotTable.isSlotFree(1), is(true));
- } finally {
- taskSlotTable.stop();
}
}
@Test
- public void testSlotAllocationWithDynamicSlotId() {
- final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
-
- try {
- taskSlotTable.start(new TestingSlotActionsBuilder().build());
-
+ public void testSlotAllocationWithDynamicSlotId() throws Exception {
+ try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(2)) {
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
@@ -159,18 +143,12 @@
assertThat(allocatedSlots.next().getIndex(), is(-1));
assertThat(allocatedSlots.hasNext(), is(false));
assertThat(taskSlotTable.isAllocated(-1, jobId, allocationId), is(true));
- } finally {
- taskSlotTable.stop();
}
}
@Test
- public void testSlotAllocationWithResourceProfile() {
- final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
-
- try {
- taskSlotTable.start(new TestingSlotActionsBuilder().build());
-
+ public void testSlotAllocationWithResourceProfile() throws Exception {
+ try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(2)) {
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile = TaskSlotUtils.DEFAULT_RESOURCE_PROFILE
@@ -183,18 +161,12 @@
assertThat(allocatedSlot.getIndex(), is(-1));
assertThat(allocatedSlot.getResourceProfile(), is(resourceProfile));
assertThat(allocatedSlots.hasNext(), is(false));
- } finally {
- taskSlotTable.stop();
}
}
@Test
- public void testSlotAllocationWithResourceProfileFailure() {
- final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
-
- try {
- taskSlotTable.start(new TestingSlotActionsBuilder().build());
-
+ public void testSlotAllocationWithResourceProfileFailure() throws Exception {
+ try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(2)) {
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
ResourceProfile resourceProfile = TaskSlotUtils.DEFAULT_RESOURCE_PROFILE;
@@ -204,18 +176,12 @@
Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
assertThat(allocatedSlots.hasNext(), is(false));
- } finally {
- taskSlotTable.stop();
}
}
@Test
- public void testGenerateSlotReport() throws SlotNotFoundException {
- final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(3);
-
- try {
- taskSlotTable.start(new TestingSlotActionsBuilder().build());
-
+ public void testGenerateSlotReport() throws Exception {
+ try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(3)) {
final JobID jobId = new JobID();
final AllocationID allocationId1 = new AllocationID();
final AllocationID allocationId2 = new AllocationID();
@@ -238,8 +204,113 @@
is(new SlotStatus(new SlotID(resourceId, 1), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, null, null)),
is(new SlotStatus(new SlotID(resourceId, 2), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, null, null)),
is(new SlotStatus(SlotID.generateDynamicSlotID(resourceId), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, jobId, allocationId3))));
- } finally {
- taskSlotTable.stop();
}
}
+
+ @Test
+ public void testAllocateSlot() throws Exception {
+ final JobID jobId = new JobID();
+ final AllocationID allocationId = new AllocationID();
+ try (final TaskSlotTable<TaskSlotPayload> taskSlotTable =
+ createTaskSlotTableWithAllocatedSlot(jobId, allocationId, new TestingSlotActionsBuilder().build())) {
+ Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
+ TaskSlot<TaskSlotPayload> nextSlot = allocatedSlots.next();
+ assertThat(nextSlot.getIndex(), is(0));
+ assertThat(nextSlot.getAllocationId(), is(allocationId));
+ assertThat(nextSlot.getJobId(), is(jobId));
+ assertThat(allocatedSlots.hasNext(), is(false));
+ }
+ }
+
+ @Test
+ public void testAddTask() throws Exception {
+ final JobID jobId = new JobID();
+ final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+ final AllocationID allocationId = new AllocationID();
+ TaskSlotPayload task = new TestingTaskSlotPayload(jobId, executionAttemptId, allocationId).terminate();
+ try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableWithStartedTask(task)) {
+ Iterator<TaskSlotPayload> tasks = taskSlotTable.getTasks(jobId);
+ TaskSlotPayload nextTask = tasks.next();
+ assertThat(nextTask.getExecutionId(), is(executionAttemptId));
+ assertThat(nextTask.getAllocationId(), is(allocationId));
+ assertThat(tasks.hasNext(), is(false));
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testRemoveTaskCallsFreeSlotAction() throws Exception {
+ final JobID jobId = new JobID();
+ final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+ final AllocationID allocationId = new AllocationID();
+ CompletableFuture<AllocationID> freeSlotFuture = new CompletableFuture<>();
+ SlotActions slotActions = new TestingSlotActions(freeSlotFuture::complete, (aid, uid) -> {});
+ TaskSlotPayload task = new TestingTaskSlotPayload(jobId, executionAttemptId, allocationId).terminate();
+ try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableWithStartedTask(task, slotActions)) {
+ // we have to initiate closing of the slot externally
+ // to enable that the last remaining finished task does the final slot freeing
+ taskSlotTable.freeSlot(allocationId);
+ taskSlotTable.removeTask(executionAttemptId);
+ assertThat(freeSlotFuture.get(), is(allocationId));
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testFreeSlotInterruptsSubmittedTask() throws Exception {
+ TestingTaskSlotPayload task = new TestingTaskSlotPayload();
+ try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableWithStartedTask(task)) {
+ assertThat(taskSlotTable.freeSlot(task.getAllocationId()), is(-1));
+ task.waitForFailure();
+ task.terminate();
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testTableIsClosedOnlyWhenAllTasksTerminated() throws Exception {
+ TestingTaskSlotPayload task = new TestingTaskSlotPayload();
+ final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableWithStartedTask(task);
+ assertThat(taskSlotTable.freeSlot(task.getAllocationId()), is(-1));
+ CompletableFuture<Void> closingFuture = taskSlotTable.closeAsync();
+ assertThat(closingFuture.isDone(), is(false));
+ task.terminate();
+ closingFuture.get();
+ }
+
+ private static TaskSlotTable<TaskSlotPayload> createTaskSlotTableWithStartedTask(
+ final TaskSlotPayload task) throws SlotNotFoundException, SlotNotActiveException {
+ return createTaskSlotTableWithStartedTask(task, new TestingSlotActionsBuilder().build());
+ }
+
+ private static TaskSlotTable<TaskSlotPayload> createTaskSlotTableWithStartedTask(
+ final TaskSlotPayload task,
+ final SlotActions slotActions) throws SlotNotFoundException, SlotNotActiveException {
+ final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableWithAllocatedSlot(
+ task.getJobID(),
+ task.getAllocationId(),
+ slotActions);
+ taskSlotTable.markSlotActive(task.getAllocationId());
+ taskSlotTable.addTask(task);
+ return taskSlotTable;
+ }
+
+ private static TaskSlotTable<TaskSlotPayload> createTaskSlotTableWithAllocatedSlot(
+ final JobID jobId,
+ final AllocationID allocationId,
+ final SlotActions slotActions) {
+ final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(1, slotActions);
+ assertThat(taskSlotTable.allocateSlot(0, jobId, allocationId, SLOT_TIMEOUT), is(true));
+ return taskSlotTable;
+ }
+
+ private static TaskSlotTableImpl<TaskSlotPayload> createTaskSlotTableAndStart(final int numberOfSlots) {
+ return createTaskSlotTableAndStart(numberOfSlots, new TestingSlotActionsBuilder().build());
+ }
+
+ private static TaskSlotTableImpl<TaskSlotPayload> createTaskSlotTableAndStart(
+ final int numberOfSlots,
+ final SlotActions slotActions) {
+ final TaskSlotTableImpl<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(numberOfSlots);
+ taskSlotTable.start(slotActions, ComponentMainThreadExecutorServiceAdapter.forMainThread());
+ return taskSlotTable;
+ }
+
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTest.java
new file mode 100644
index 0000000..e922c9e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class TaskSlotTest extends TestLogger {
+ private static final JobID JOB_ID = new JobID();
+ private static final AllocationID ALLOCATION_ID = new AllocationID();
+
+ @Test
+ public void testTaskSlotClosedOnlyWhenAddedTasksTerminated() throws Exception {
+ try (TaskSlot<TaskSlotPayload> taskSlot = createTaskSlot()) {
+ taskSlot.markActive();
+ TestingTaskSlotPayload task = new TestingTaskSlotPayload(JOB_ID, new ExecutionAttemptID(), ALLOCATION_ID);
+ taskSlot.add(task);
+
+ CompletableFuture<Void> closingFuture = taskSlot.closeAsync();
+ task.waitForFailure();
+ MemoryManager memoryManager = taskSlot.getMemoryManager();
+
+ assertThat(closingFuture.isDone(), is(false));
+ assertThat(memoryManager.isShutdown(), is(false));
+ task.terminate();
+ closingFuture.get();
+ assertThat(memoryManager.isShutdown(), is(true));
+ }
+ }
+
+ private static <T extends TaskSlotPayload> TaskSlot<T> createTaskSlot() {
+ return new TaskSlot<>(0, ResourceProfile.ZERO, MemoryManager.MIN_PAGE_SIZE, JOB_ID, ALLOCATION_ID);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
index 27c8d1c..38d0c97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
@@ -39,10 +39,10 @@
.setNetworkMemory(new MemorySize(100 * 1024))
.build();
- public static <T extends TaskSlotPayload> TaskSlotTable<T> createTaskSlotTable(int numberOfSlots) {
+ public static <T extends TaskSlotPayload> TaskSlotTableImpl<T> createTaskSlotTable(int numberOfSlots) {
return createTaskSlotTable(
numberOfSlots,
- createDefaultTimerService(DEFAULT_SLOT_TIMEOUT));
+ createDefaultTimerService());
}
public static <T extends TaskSlotPayload> TaskSlotTable<T> createTaskSlotTable(int numberOfSlots, Time timeout) {
@@ -51,10 +51,10 @@
createDefaultTimerService(timeout.toMilliseconds()));
}
- private static <T extends TaskSlotPayload> TaskSlotTable<T> createTaskSlotTable(
+ private static <T extends TaskSlotPayload> TaskSlotTableImpl<T> createTaskSlotTable(
int numberOfSlots,
TimerService<AllocationID> timerService) {
- return new TaskSlotTable<>(
+ return new TaskSlotTableImpl<>(
numberOfSlots,
createTotalResourceProfile(numberOfSlots),
DEFAULT_RESOURCE_PROFILE,
@@ -70,6 +70,10 @@
return result;
}
+ public static TimerService<AllocationID> createDefaultTimerService() {
+ return createDefaultTimerService(DEFAULT_SLOT_TIMEOUT);
+ }
+
public static TimerService<AllocationID> createDefaultTimerService(long shutdownTimeout) {
return new TimerService<>(TestingUtils.defaultExecutor(), shutdownTimeout);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingTaskSlotPayload.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingTaskSlotPayload.java
new file mode 100644
index 0000000..2b362ed
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingTaskSlotPayload.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.util.concurrent.CompletableFuture;
+
+class TestingTaskSlotPayload implements TaskSlotPayload {
+ private final JobID jobId;
+ private final ExecutionAttemptID executionAttemptID;
+ private final AllocationID allocationID;
+ private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+ private final OneShotLatch failLatch = new OneShotLatch();
+
+ TestingTaskSlotPayload() {
+ this(new JobID(), new ExecutionAttemptID(), new AllocationID());
+ }
+
+ TestingTaskSlotPayload(JobID jobId, ExecutionAttemptID executionAttemptID, AllocationID allocationID) {
+ this.jobId = jobId;
+ this.executionAttemptID = executionAttemptID;
+ this.allocationID = allocationID;
+ }
+
+ @Override
+ public JobID getJobID() {
+ return jobId;
+ }
+
+ @Override
+ public ExecutionAttemptID getExecutionId() {
+ return executionAttemptID;
+ }
+
+ @Override
+ public AllocationID getAllocationId() {
+ return allocationID;
+ }
+
+ @Override
+ public CompletableFuture<Void> getTerminationFuture() {
+ return terminationFuture;
+ }
+
+ @Override
+ public void failExternally(Throwable cause) {
+ failLatch.trigger();
+ }
+
+ void waitForFailure() throws InterruptedException {
+ failLatch.await();
+ }
+
+ TestingTaskSlotPayload terminate() {
+ terminationFuture.complete(null);
+ return this;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingTaskSlotTable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingTaskSlotTable.java
new file mode 100644
index 0000000..3618df9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingTaskSlotTable.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Testing implementation of {@link TaskSlotTable} for tests.
+ */
+public class TestingTaskSlotTable<T extends TaskSlotPayload> implements TaskSlotTable<T> {
+ private final Supplier<SlotReport> createSlotReportSupplier;
+ private final Supplier<Boolean> allocateSlotSupplier;
+ private final BiFunction<JobID, AllocationID, Boolean> tryMarkSlotActiveBiFunction;
+ private final Function<T, Boolean> addTaskFunction;
+ private final Function<AllocationID, MemoryManager> memoryManagerGetter;
+ private final Supplier<CompletableFuture<Void>> closeAsyncSupplier;
+
+ private TestingTaskSlotTable(
+ Supplier<SlotReport> createSlotReportSupplier,
+ Supplier<Boolean> allocateSlotSupplier,
+ BiFunction<JobID, AllocationID, Boolean> tryMarkSlotActiveBiFunction,
+ Function<T, Boolean> addTaskFunction,
+ Function<AllocationID, MemoryManager> memoryManagerGetter,
+ Supplier<CompletableFuture<Void>> closeAsyncSupplier) {
+ this.createSlotReportSupplier = createSlotReportSupplier;
+ this.allocateSlotSupplier = allocateSlotSupplier;
+ this.tryMarkSlotActiveBiFunction = tryMarkSlotActiveBiFunction;
+ this.addTaskFunction = addTaskFunction;
+ this.memoryManagerGetter = memoryManagerGetter;
+ this.closeAsyncSupplier = closeAsyncSupplier;
+ }
+
+ @Override
+ public void start(SlotActions initialSlotActions, ComponentMainThreadExecutor mainThreadExecutor) {
+
+ }
+
+ @Override
+ public Set<AllocationID> getAllocationIdsPerJob(JobID jobId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SlotReport createSlotReport(ResourceID resourceId) {
+ return createSlotReportSupplier.get();
+ }
+
+ @Override
+ public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
+ return allocateSlotSupplier.get();
+ }
+
+ @Override
+ public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile, Time slotTimeout) {
+ return allocateSlotSupplier.get();
+ }
+
+ @Override
+ public boolean markSlotActive(AllocationID allocationId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int freeSlot(AllocationID allocationId, Throwable cause) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isValidTimeout(AllocationID allocationId, UUID ticket) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) {
+ return tryMarkSlotActiveBiFunction.apply(jobId, allocationId);
+ }
+
+ @Override
+ public boolean isSlotFree(int index) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasAllocatedSlots(JobID jobId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<AllocationID> getActiveSlots(JobID jobId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Nullable
+ @Override
+ public JobID getOwningJob(AllocationID allocationId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean addTask(T task) {
+ return addTaskFunction.apply(task);
+ }
+
+ @Override
+ public T removeTask(ExecutionAttemptID executionAttemptID) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public T getTask(ExecutionAttemptID executionAttemptID) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<T> getTasks(JobID jobId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public AllocationID getCurrentAllocation(int index) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public MemoryManager getTaskMemoryManager(AllocationID allocationID) {
+ return memoryManagerGetter.apply(allocationID);
+ }
+
+ @Override
+ public void notifyTimeout(AllocationID key, UUID ticket) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return closeAsyncSupplier.get();
+ }
+
+ public static <T extends TaskSlotPayload> TestingTaskSlotTableBuilder<T> newBuilder() {
+ return new TestingTaskSlotTableBuilder<>();
+ }
+
+ /**
+ * Builder for {@link TestingTaskSlotTable}.
+ */
+ public static class TestingTaskSlotTableBuilder<T extends TaskSlotPayload> {
+ private Supplier<SlotReport> createSlotReportSupplier = SlotReport::new;
+ private Supplier<Boolean> allocateSlotSupplier = () -> false;
+ private BiFunction<JobID, AllocationID, Boolean> tryMarkSlotActiveBiFunction = (ignoredA, ignoredB) -> false;
+ private Function<T, Boolean> addTaskFunction = (ignored) -> false;
+ private Function<AllocationID, MemoryManager> memoryManagerGetter = ignored -> {
+ throw new UnsupportedOperationException("No memory manager getter has been set.");
+ };
+ private Supplier<CompletableFuture<Void>> closeAsyncSupplier = FutureUtils::completedVoidFuture;
+
+ public TestingTaskSlotTableBuilder<T> createSlotReportSupplier(Supplier<SlotReport> createSlotReportSupplier) {
+ this.createSlotReportSupplier = createSlotReportSupplier;
+ return this;
+ }
+
+ public TestingTaskSlotTableBuilder<T> allocateSlotReturns(boolean toReturn) {
+ this.allocateSlotSupplier = () -> toReturn;
+ return this;
+ }
+
+ public TestingTaskSlotTableBuilder<T> tryMarkSlotActiveReturns(boolean toReturn) {
+ this.tryMarkSlotActiveBiFunction = (stub1, stub2) -> toReturn;
+ return this;
+ }
+
+ public TestingTaskSlotTableBuilder<T> addTaskReturns(boolean toReturn) {
+ this.addTaskFunction = stub -> toReturn;
+ return this;
+ }
+
+ public TestingTaskSlotTableBuilder<T> memoryManagerGetterReturns(MemoryManager toReturn) {
+ this.memoryManagerGetter = stub -> toReturn;
+ return this;
+ }
+
+ public TestingTaskSlotTableBuilder<T> closeAsyncReturns(CompletableFuture<Void> toReturn) {
+ this.closeAsyncSupplier = () -> toReturn;
+ return this;
+ }
+
+ public TaskSlotTable<T> build() {
+ return new TestingTaskSlotTable<>(
+ createSlotReportSupplier,
+ allocateSlotSupplier,
+ tryMarkSlotActiveBiFunction,
+ addTaskFunction,
+ memoryManagerGetter,
+ closeAsyncSupplier);
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
index f907bc0..925b388 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
@@ -42,7 +42,6 @@
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
@@ -84,6 +83,9 @@
private Collection<PermanentBlobKey> requiredJarFileBlobKeys = Collections.emptyList();
private Collection<ResultPartitionDeploymentDescriptor> resultPartitions = Collections.emptyList();
private Collection<InputGateDeploymentDescriptor> inputGates = Collections.emptyList();
+ private JobID jobId = new JobID();
+ private AllocationID allocationID = new AllocationID();
+ private ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
public TestTaskBuilder(ShuffleEnvironment<?, ?> shuffleEnvironment) {
this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
@@ -149,10 +151,23 @@
return this;
}
+ public TestTaskBuilder setJobId(JobID jobId) {
+ this.jobId = jobId;
+ return this;
+ }
+
+ public TestTaskBuilder setAllocationID(AllocationID allocationID) {
+ this.allocationID = allocationID;
+ return this;
+ }
+
+ public TestTaskBuilder setExecutionAttemptId(ExecutionAttemptID executionAttemptId) {
+ this.executionAttemptId = executionAttemptId;
+ return this;
+ }
+
public Task build() throws Exception {
- final JobID jobId = new JobID();
final JobVertexID jobVertexId = new JobVertexID();
- final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
final SerializedValue<ExecutionConfig> serializedExecutionConfig = new SerializedValue<>(executionConfig);
@@ -182,7 +197,7 @@
jobInformation,
taskInformation,
executionAttemptId,
- new AllocationID(),
+ allocationID,
0,
0,
resultPartitions,