[FLINK-15247][Runtime] Wait for all slots to be free before task executor services shutdown upon stopping

[FLINK-15247][Runtime] Improve test coverage for slot allocation and task submission in TaskSlotTable

[hotfix][Runtime] Introduce TaskSlotTable interface

[hotfix][Runtime][Tests] Introduce StubTaskSlotTable for tests to replace mockito mocks

[hotfix][Runtime][Tests] Rework TaskExecutorTest#testTaskInterruptionAndTerminationOnShutdown to testTaskSlotTableTerminationOnShutdown

This closes #10682.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index fe272ef..6de6747 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -139,7 +139,6 @@
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
@@ -188,8 +187,6 @@
 	/** The kvState registration service in the task manager. */
 	private final KvStateService kvStateService;
 
-	private final TaskCompletionTracker taskCompletionTracker;
-
 	// --------- job manager connections -----------
 
 	private final Map<ResourceID, JobManagerConnection> jobManagerConnections;
@@ -279,7 +276,6 @@
 		this.resourceManagerAddress = null;
 		this.resourceManagerConnection = null;
 		this.currentRegistrationTimeoutId = null;
-		this.taskCompletionTracker = new TaskCompletionTracker();
 
 		final ResourceID resourceId = taskExecutorServices.getTaskManagerLocation().getResourceID();
 		this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices, resourceId);
@@ -330,7 +326,7 @@
 			resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
 
 			// tell the task slot table who's responsible for the task slot actions
-			taskSlotTable.start(new SlotActionsImpl());
+			taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
 
 			// start the job leader service
 			jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
@@ -376,7 +372,7 @@
 
 		return FutureUtils
 			.runAfterwards(
-				taskCompletionTracker.failIncompleteTasksAndGetTerminationFuture(),
+				taskSlotTable.closeAsync(),
 				this::stopTaskExecutorServices)
   		    .handle((ignored, throwable) -> {
   		    	handleOnStopException(throwableBeforeTasksCompletion, throwable);
@@ -606,7 +602,6 @@
 
 			if (taskAdded) {
 				task.startTaskThread();
-				taskCompletionTracker.trackTaskCompletion(task);
 
 				setupResultPartitionBookkeeping(
 					tdd.getJobId(),
@@ -1856,30 +1851,4 @@
 			return new TaskExecutorHeartbeatPayload(taskSlotTable.createSlotReport(getResourceID()), partitionTracker.createClusterPartitionReport());
 		}
 	}
-
-	private static class TaskCompletionTracker {
-		private final Map<ExecutionAttemptID, Task> incompleteTasks;
-
-		private TaskCompletionTracker() {
-			incompleteTasks = new ConcurrentHashMap<>(8);
-		}
-
-		void trackTaskCompletion(Task task) {
-			incompleteTasks.put(task.getExecutionId(), task);
-			task.getTerminationFuture().thenRun(() -> incompleteTasks.remove(task.getExecutionId()));
-		}
-
-		CompletableFuture<Void> failIncompleteTasksAndGetTerminationFuture() {
-			FlinkException cause = new FlinkException("The TaskExecutor is shutting down.");
-			return FutureUtils.waitForAll(
-				incompleteTasks
-					.values()
-					.stream()
-					.map(task -> {
-						task.failExternally(cause);
-						return task.getTerminationFuture();
-					})
-					.collect(Collectors.toList()));
-		}
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 1d140b2..45d6f2c 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -33,6 +33,7 @@
 import org.apache.flink.runtime.shuffle.ShuffleServiceLoader;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -182,7 +183,7 @@
 		}
 
 		try {
-			taskSlotTable.stop();
+			taskSlotTable.close();
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}
@@ -287,7 +288,7 @@
 		final TimerService<AllocationID> timerService = new TimerService<>(
 			new ScheduledThreadPoolExecutor(1),
 			timerServiceShutdownTimeout);
-		return new TaskSlotTable<>(
+		return new TaskSlotTableImpl<>(
 			numberOfSlots,
 			TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(taskExecutorResourceSpec),
 			TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(taskExecutorResourceSpec, numberOfSlots),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
index 90e640d..779d6a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -22,8 +22,11 @@
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -33,6 +36,8 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 /**
  * Container for multiple {@link TaskSlotPayload tasks} belonging to the same slot. A {@link TaskSlot} can be in one
@@ -55,7 +60,7 @@
  *
  * @param <T> type of the {@link TaskSlotPayload} stored in this slot
  */
-public class TaskSlot<T extends TaskSlotPayload> implements AutoCloseable {
+public class TaskSlot<T extends TaskSlotPayload> implements AutoCloseableAsync {
 	private static final Logger LOG = LoggerFactory.getLogger(TaskSlot.class);
 
 	/** Index of the task slot. */
@@ -78,6 +83,9 @@
 	/** Allocation id of this slot. */
 	private final AllocationID allocationId;
 
+	/** The closing future is completed when the slot is freed and closed. */
+	private final CompletableFuture<Void> closingFuture;
+
 	public TaskSlot(
 		final int index,
 		final ResourceProfile resourceProfile,
@@ -95,6 +103,8 @@
 		this.allocationId = allocationId;
 
 		this.memoryManager = createMemoryManager(resourceProfile, memoryPageSize);
+
+		this.closingFuture = new CompletableFuture<>();
 	}
 
 	// ----------------------------------------------------------------------------------
@@ -245,16 +255,6 @@
 	}
 
 	/**
-	 * Mark this slot as releasing. A slot can always be marked as releasing.
-	 *
-	 * @return True
-	 */
-	public boolean markReleasing() {
-		state = TaskSlotState.RELEASING;
-		return true;
-	}
-
-	/**
 	 * Generate the slot offer from this TaskSlot.
 	 *
 	 * @return The sot offer which this task slot can provide
@@ -274,9 +274,39 @@
 	}
 
 	@Override
-	public void close() {
-		verifyMemoryFreed();
-		this.memoryManager.shutdown();
+	public CompletableFuture<Void> closeAsync() {
+		return closeAsync(new FlinkException("Closing the slot"));
+	}
+
+	/**
+	 * Close the task slot asynchronously.
+	 *
+	 * <p>Slot is moved to {@link TaskSlotState#RELEASING} state and only once.
+	 * If there are active tasks running in the slot then they are failed.
+	 * The future of all tasks terminated and slot cleaned up is initiated only once and always returned
+	 * in case of multiple attempts to close the slot.
+	 *
+	 * @param cause cause of closing
+	 * @return future of all running task if any being done and slot cleaned up.
+	 */
+	CompletableFuture<Void> closeAsync(Throwable cause) {
+		if (!isReleasing()) {
+			state = TaskSlotState.RELEASING;
+			if (!isEmpty()) {
+				// we couldn't free the task slot because it still contains task, fail the tasks
+				// and set the slot state to releasing so that it gets eventually freed
+				tasks.values().forEach(task -> task.failExternally(cause));
+			}
+			final CompletableFuture<Void> cleanupFuture = FutureUtils
+				.waitForAll(tasks.values().stream().map(TaskSlotPayload::getTerminationFuture).collect(Collectors.toList()))
+				.thenRun(() -> {
+					verifyMemoryFreed();
+					this.memoryManager.shutdown();
+				});
+
+			FutureUtils.forward(cleanupFuture, closingFuture);
+		}
+		return closingFuture;
 	}
 
 	private void verifyMemoryFreed() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 00b9691..4fe033c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -22,29 +22,17 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceBudgetManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.util.AutoCloseableAsync;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.UUID;
 
@@ -57,111 +45,14 @@
  *
  * <p>Before the task slot table can be used, it must be started via the {@link #start} method.
  */
-public class TaskSlotTable<T extends TaskSlotPayload> implements TimeoutListener<AllocationID> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(TaskSlotTable.class);
-
-	/**
-	 * Number of slots in static slot allocation.
-	 * If slot is requested with an index, the requested index must within the range of [0, numberSlots).
-	 * When generating slot report, we should always generate slots with index in [0, numberSlots) even the slot does not exist.
-	 */
-	private final int numberSlots;
-
-	/** Slot resource profile for static slot allocation. */
-	private final ResourceProfile defaultSlotResourceProfile;
-
-	/** Page size for memory manager. */
-	private final int memoryPageSize;
-
-	/** Timer service used to time out allocated slots. */
-	private final TimerService<AllocationID> timerService;
-
-	/** The list of all task slots. */
-	private final Map<Integer, TaskSlot<T>> taskSlots;
-
-	/** Mapping from allocation id to task slot. */
-	private final Map<AllocationID, TaskSlot<T>> allocatedSlots;
-
-	/** Mapping from execution attempt id to task and task slot. */
-	private final Map<ExecutionAttemptID, TaskSlotMapping<T>> taskSlotMappings;
-
-	/** Mapping from job id to allocated slots for a job. */
-	private final Map<JobID, Set<AllocationID>> slotsPerJob;
-
-	/** Interface for slot actions, such as freeing them or timing them out. */
-	private SlotActions slotActions;
-
-	/** Whether the table has been started. */
-	private volatile boolean started;
-
-	private final ResourceBudgetManager budgetManager;
-
-	public TaskSlotTable(
-		final int numberSlots,
-		final ResourceProfile totalAvailableResourceProfile,
-		final ResourceProfile defaultSlotResourceProfile,
-		final int memoryPageSize,
-		final TimerService<AllocationID> timerService) {
-
-		Preconditions.checkArgument(0 < numberSlots, "The number of task slots must be greater than 0.");
-
-		this.numberSlots = numberSlots;
-		this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile);
-		this.memoryPageSize = memoryPageSize;
-
-		this.taskSlots = new HashMap<>(numberSlots);
-
-		this.timerService = Preconditions.checkNotNull(timerService);
-
-		budgetManager = new ResourceBudgetManager(Preconditions.checkNotNull(totalAvailableResourceProfile));
-
-		allocatedSlots = new HashMap<>(numberSlots);
-
-		taskSlotMappings = new HashMap<>(4 * numberSlots);
-
-		slotsPerJob = new HashMap<>(4);
-
-		slotActions = null;
-		started = false;
-	}
-
+public interface TaskSlotTable<T extends TaskSlotPayload> extends TimeoutListener<AllocationID>, AutoCloseableAsync {
 	/**
 	 * Start the task slot table with the given slot actions.
 	 *
 	 * @param initialSlotActions to use for slot actions
+	 * @param mainThreadExecutor {@link ComponentMainThreadExecutor} to schedule internal calls to the main thread
 	 */
-	public void start(SlotActions initialSlotActions) {
-		this.slotActions = Preconditions.checkNotNull(initialSlotActions);
-
-		timerService.start(this);
-
-		started = true;
-	}
-
-	/**
-	 * Stop the task slot table.
-	 */
-	public void stop() {
-		started = false;
-		timerService.stop();
-		allocatedSlots
-			.values()
-			.stream()
-			.filter(slot -> !taskSlots.containsKey(slot.getIndex()))
-			.forEach(TaskSlot::close);
-		allocatedSlots.clear();
-		taskSlots.values().forEach(TaskSlot::close);
-		taskSlots.clear();
-		slotActions = null;
-	}
-
-	@VisibleForTesting
-	public boolean isStopped() {
-		return !started &&
-			taskSlots.values().stream().allMatch(slot -> slot.getMemoryManager().isShutdown()) &&
-			allocatedSlots.values().stream().allMatch(slot -> slot.getMemoryManager().isShutdown());
-	}
+	void start(SlotActions initialSlotActions, ComponentMainThreadExecutor mainThreadExecutor);
 
 	/**
 	 * Returns the all {@link AllocationID} for the given job.
@@ -169,65 +60,9 @@
 	 * @param jobId for which to return the set of {@link AllocationID}.
 	 * @return Set of {@link AllocationID} for the given job
 	 */
-	public Set<AllocationID> getAllocationIdsPerJob(JobID jobId) {
-		final Set<AllocationID> allocationIds = slotsPerJob.get(jobId);
+	Set<AllocationID> getAllocationIdsPerJob(JobID jobId);
 
-		if (allocationIds == null) {
-			return Collections.emptySet();
-		} else {
-			return Collections.unmodifiableSet(allocationIds);
-		}
-	}
-
-	// ---------------------------------------------------------------------
-	// Slot report methods
-	// ---------------------------------------------------------------------
-
-	public SlotReport createSlotReport(ResourceID resourceId) {
-		List<SlotStatus> slotStatuses = new ArrayList<>();
-
-		for (int i = 0; i < numberSlots; i++) {
-			SlotID slotId = new SlotID(resourceId, i);
-			SlotStatus slotStatus;
-			if (taskSlots.containsKey(i)) {
-				TaskSlot<T> taskSlot = taskSlots.get(i);
-
-				slotStatus = new SlotStatus(
-					slotId,
-					taskSlot.getResourceProfile(),
-					taskSlot.getJobId(),
-					taskSlot.getAllocationId());
-			} else {
-				slotStatus = new SlotStatus(
-					slotId,
-					defaultSlotResourceProfile,
-					null,
-					null);
-			}
-
-			slotStatuses.add(slotStatus);
-		}
-
-		for (TaskSlot<T> taskSlot : allocatedSlots.values()) {
-			if (taskSlot.getIndex() < 0) {
-				SlotID slotID = SlotID.generateDynamicSlotID(resourceId);
-				SlotStatus slotStatus = new SlotStatus(
-					slotID,
-					taskSlot.getResourceProfile(),
-					taskSlot.getJobId(),
-					taskSlot.getAllocationId());
-				slotStatuses.add(slotStatus);
-			}
-		}
-
-		final SlotReport slotReport = new SlotReport(slotStatuses);
-
-		return slotReport;
-	}
-
-	// ---------------------------------------------------------------------
-	// Slot methods
-	// ---------------------------------------------------------------------
+	SlotReport createSlotReport(ResourceID resourceId);
 
 	/**
 	 * Allocate the slot with the given index for the given job and allocation id. If negative index is
@@ -241,9 +76,7 @@
 	 * @return True if the task slot could be allocated; otherwise false
 	 */
 	@VisibleForTesting
-	public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
-		return allocateSlot(index, jobId, allocationId, defaultSlotResourceProfile, slotTimeout);
-	}
+	boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout);
 
 	/**
 	 * Allocate the slot with the given index for the given job and allocation id. If negative index is
@@ -257,64 +90,12 @@
 	 * @param slotTimeout until the slot times out
 	 * @return True if the task slot could be allocated; otherwise false
 	 */
-	public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile, Time slotTimeout) {
-		checkInit();
-
-		Preconditions.checkArgument(index < numberSlots);
-
-		TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
-		if (taskSlot != null) {
-			LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot);
-			return false;
-		}
-
-		if (taskSlots.containsKey(index)) {
-			TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index);
-			LOG.info("Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.",
-				index,
-				duplicatedTaskSlot.getResourceProfile(),
-				duplicatedTaskSlot.getJobId(),
-				duplicatedTaskSlot.getAllocationId());
-			return duplicatedTaskSlot.getJobId().equals(jobId) &&
-				duplicatedTaskSlot.getAllocationId().equals(allocationId);
-		} else if (allocatedSlots.containsKey(allocationId)) {
-			return true;
-		}
-
-		resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile;
-
-		if (!budgetManager.reserve(resourceProfile)) {
-			LOG.info("Cannot allocate the requested resources. Trying to allocate {}, "
-					+ "while the currently remaining available resources are {}, total is {}.",
-				resourceProfile,
-				budgetManager.getAvailableBudget(),
-				budgetManager.getTotalBudget());
-			return false;
-		}
-
-		taskSlot = new TaskSlot<>(index, resourceProfile, memoryPageSize, jobId, allocationId);
-		if (index >= 0) {
-			taskSlots.put(index, taskSlot);
-		}
-
-		// update the allocation id to task slot map
-		allocatedSlots.put(allocationId, taskSlot);
-
-		// register a timeout for this slot since it's in state allocated
-		timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
-
-		// add this slot to the set of job slots
-		Set<AllocationID> slots = slotsPerJob.get(jobId);
-
-		if (slots == null) {
-			slots = new HashSet<>(4);
-			slotsPerJob.put(jobId, slots);
-		}
-
-		slots.add(allocationId);
-
-		return true;
-	}
+	boolean allocateSlot(
+		int index,
+		JobID jobId,
+		AllocationID allocationId,
+		ResourceProfile resourceProfile,
+		Time slotTimeout);
 
 	/**
 	 * Marks the slot under the given allocation id as active. If the slot could not be found, then
@@ -324,26 +105,7 @@
 	 * @throws SlotNotFoundException if the slot could not be found for the given allocation id
 	 * @return True if the slot could be marked active; otherwise false
 	 */
-	public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException {
-		checkInit();
-
-		TaskSlot<T> taskSlot = getTaskSlot(allocationId);
-
-		if (taskSlot != null) {
-			if (taskSlot.markActive()) {
-				// unregister a potential timeout
-				LOG.info("Activate slot {}.", allocationId);
-
-				timerService.unregisterTimeout(allocationId);
-
-				return true;
-			} else {
-				return false;
-			}
-		} else {
-			throw new SlotNotFoundException(allocationId);
-		}
-	}
+	boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException;
 
 	/**
 	 * Marks the slot under the given allocation id as inactive. If the slot could not be found,
@@ -354,24 +116,7 @@
 	 * @throws SlotNotFoundException if the slot could not be found for the given allocation id
 	 * @return True if the slot could be marked inactive
 	 */
-	public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) throws SlotNotFoundException {
-		checkInit();
-
-		TaskSlot<T> taskSlot = getTaskSlot(allocationId);
-
-		if (taskSlot != null) {
-			if (taskSlot.markInactive()) {
-				// register a timeout to free the slot
-				timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
-
-				return true;
-			} else {
-				return false;
-			}
-		} else {
-			throw new SlotNotFoundException(allocationId);
-		}
-	}
+	boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) throws SlotNotFoundException;
 
 	/**
 	 * Try to free the slot. If the slot is empty it will set the state of the task slot to free
@@ -382,7 +127,7 @@
 	 * @throws SlotNotFoundException if there is not task slot for the given allocation id
 	 * @return Index of the freed slot if the slot could be freed; otherwise -1
 	 */
-	public int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
+	default int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
 		return freeSlot(allocationId, new Exception("The task slot of this task is being freed."));
 	}
 
@@ -394,64 +139,9 @@
 	 * @param allocationId identifying the task slot to be freed
 	 * @param cause to fail the tasks with if slot is not empty
 	 * @throws SlotNotFoundException if there is not task slot for the given allocation id
-	 * @return The freed TaskSlot. If the TaskSlot cannot be freed then null.
+	 * @return Index of the freed slot if the slot could be freed; otherwise -1
 	 */
-	public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
-		checkInit();
-
-		TaskSlot<T> taskSlot = getTaskSlot(allocationId);
-
-		if (taskSlot != null) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Free slot {}.", taskSlot, cause);
-			} else {
-				LOG.info("Free slot {}.", taskSlot);
-			}
-
-			final JobID jobId = taskSlot.getJobId();
-
-			if (taskSlot.isEmpty()) {
-				// remove the allocation id to task slot mapping
-				allocatedSlots.remove(allocationId);
-
-				// unregister a potential timeout
-				timerService.unregisterTimeout(allocationId);
-
-				Set<AllocationID> slots = slotsPerJob.get(jobId);
-
-				if (slots == null) {
-					throw new IllegalStateException("There are no more slots allocated for the job " + jobId +
-						". This indicates a programming bug.");
-				}
-
-				slots.remove(allocationId);
-
-				if (slots.isEmpty()) {
-					slotsPerJob.remove(jobId);
-				}
-
-				taskSlot.close();
-				taskSlots.remove(taskSlot.getIndex());
-				budgetManager.release(taskSlot.getResourceProfile());
-
-				return taskSlot.getIndex();
-			} else {
-				// we couldn't free the task slot because it still contains task, fail the tasks
-				// and set the slot state to releasing so that it gets eventually freed
-				taskSlot.markReleasing();
-
-				Iterator<T> taskIterator = taskSlot.getTasks();
-
-				while (taskIterator.hasNext()) {
-					taskIterator.next().failExternally(cause);
-				}
-
-				return -1;
-			}
-		} else {
-			throw new SlotNotFoundException(allocationId);
-		}
-	}
+	int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException;
 
 	/**
 	 * Check whether the timeout with ticket is valid for the given allocation id.
@@ -460,11 +150,7 @@
 	 * @param ticket of the timeout
 	 * @return True if the timeout is valid; otherwise false
 	 */
-	public boolean isValidTimeout(AllocationID allocationId, UUID ticket) {
-		checkInit();
-
-		return timerService.isValid(allocationId, ticket);
-	}
+	boolean isValidTimeout(AllocationID allocationId, UUID ticket);
 
 	/**
 	 * Check whether the slot for the given index is allocated for the given job and allocation id.
@@ -474,16 +160,7 @@
 	 * @param allocationId which should match the task slot's allocation id
 	 * @return True if the given task slot is allocated for the given job and allocation id
 	 */
-	public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
-		TaskSlot<T> taskSlot = taskSlots.get(index);
-		if (taskSlot != null) {
-			return taskSlot.isAllocated(jobId, allocationId);
-		} else if (index < 0) {
-			return allocatedSlots.containsKey(allocationId);
-		} else {
-			return false;
-		}
-	}
+	boolean isAllocated(int index, JobID jobId, AllocationID allocationId);
 
 	/**
 	 * Try to mark the specified slot as active if it has been allocated by the given job.
@@ -492,15 +169,7 @@
 	 * @param allocationId identifying the allocation
 	 * @return True if the task slot could be marked active.
 	 */
-	public boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) {
-		TaskSlot<T> taskSlot = getTaskSlot(allocationId);
-
-		if (taskSlot != null && taskSlot.isAllocated(jobId, allocationId)) {
-			return taskSlot.markActive();
-		} else {
-			return false;
-		}
-	}
+	boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId);
 
 	/**
 	 * Check whether the task slot with the given index is free.
@@ -508,9 +177,7 @@
 	 * @param index of the task slot
 	 * @return True if the task slot is free; otherwise false
 	 */
-	public boolean isSlotFree(int index) {
-		return !taskSlots.containsKey(index);
-	}
+	boolean isSlotFree(int index);
 
 	/**
 	 * Check whether the job has allocated (not active) slots.
@@ -518,9 +185,7 @@
 	 * @param jobId for which to check for allocated slots
 	 * @return True if there are allocated slots for the given job id.
 	 */
-	public boolean hasAllocatedSlots(JobID jobId) {
-		return getAllocatedSlots(jobId).hasNext();
-	}
+	boolean hasAllocatedSlots(JobID jobId);
 
 	/**
 	 * Return an iterator of allocated slots for the given job id.
@@ -528,9 +193,7 @@
 	 * @param jobId for which to return the allocated slots
 	 * @return Iterator of allocated slots.
 	 */
-	public Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId) {
-		return new TaskSlotIterator(jobId, TaskSlotState.ALLOCATED);
-	}
+	Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId);
 
 	/**
 	 * Return an iterator of active slots (their application ids) for the given job id.
@@ -538,9 +201,7 @@
 	 * @param jobId for which to return the active slots
 	 * @return Iterator of allocation ids of active slots
 	 */
-	public Iterator<AllocationID> getActiveSlots(JobID jobId) {
-		return new AllocationIDIterator(jobId, TaskSlotState.ACTIVE);
-	}
+	Iterator<AllocationID> getActiveSlots(JobID jobId);
 
 	/**
 	 * Returns the owning job of the {@link TaskSlot} identified by the
@@ -551,19 +212,7 @@
 	 * the given allocation id or if the slot has no owning job assigned
 	 */
 	@Nullable
-	public JobID getOwningJob(AllocationID allocationId) {
-		final TaskSlot<T> taskSlot = getTaskSlot(allocationId);
-
-		if (taskSlot != null) {
-			return taskSlot.getJobId();
-		} else {
-			return null;
-		}
-	}
-
-	// ---------------------------------------------------------------------
-	// Task methods
-	// ---------------------------------------------------------------------
+	JobID getOwningJob(AllocationID allocationId);
 
 	/**
 	 * Add the given task to the slot identified by the task's allocation id.
@@ -573,27 +222,7 @@
 	 * @throws SlotNotActiveException if there was no slot active for task's job and allocation id
 	 * @return True if the task could be added to the task slot; otherwise false
 	 */
-	public boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException {
-		Preconditions.checkNotNull(task);
-
-		TaskSlot<T> taskSlot = getTaskSlot(task.getAllocationId());
-
-		if (taskSlot != null) {
-			if (taskSlot.isActive(task.getJobID(), task.getAllocationId())) {
-				if (taskSlot.add(task)) {
-					taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping<>(task, taskSlot));
-
-					return true;
-				} else {
-					return false;
-				}
-			} else {
-				throw new SlotNotActiveException(task.getJobID(), task.getAllocationId());
-			}
-		} else {
-			throw new SlotNotFoundException(task.getAllocationId());
-		}
-	}
+	boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException;
 
 	/**
 	 * Remove the task with the given execution attempt id from its task slot. If the owning task
@@ -603,26 +232,7 @@
 	 * @param executionAttemptID identifying the task to remove
 	 * @return The removed task if there is any for the given execution attempt id; otherwise null
 	 */
-	public T removeTask(ExecutionAttemptID executionAttemptID) {
-		checkInit();
-
-		TaskSlotMapping<T> taskSlotMapping = taskSlotMappings.remove(executionAttemptID);
-
-		if (taskSlotMapping != null) {
-			T task = taskSlotMapping.getTask();
-			TaskSlot<T> taskSlot = taskSlotMapping.getTaskSlot();
-
-			taskSlot.remove(task.getExecutionId());
-
-			if (taskSlot.isReleasing() && taskSlot.isEmpty()) {
-				slotActions.freeSlot(taskSlot.getAllocationId());
-			}
-
-			return task;
-		} else {
-			return null;
-		}
-	}
+	T removeTask(ExecutionAttemptID executionAttemptID);
 
 	/**
 	 * Get the task for the given execution attempt id. If none could be found, then return null.
@@ -630,15 +240,7 @@
 	 * @param executionAttemptID identifying the requested task
 	 * @return The task for the given execution attempt id if it exist; otherwise null
 	 */
-	public T getTask(ExecutionAttemptID executionAttemptID) {
-		TaskSlotMapping<T> taskSlotMapping = taskSlotMappings.get(executionAttemptID);
-
-		if (taskSlotMapping != null) {
-			return taskSlotMapping.getTask();
-		} else {
-			return null;
-		}
-	}
+	T getTask(ExecutionAttemptID executionAttemptID);
 
 	/**
 	 * Return an iterator over all tasks for a given job.
@@ -646,9 +248,7 @@
 	 * @param jobId identifying the job of the requested tasks
 	 * @return Iterator over all task for a given job
 	 */
-	public Iterator<T> getTasks(JobID jobId) {
-		return new TaskIterator(jobId);
-	}
+	Iterator<T> getTasks(JobID jobId);
 
 	/**
 	 * Get the current allocation for the task slot with the given index.
@@ -656,13 +256,7 @@
 	 * @param index identifying the slot for which the allocation id shall be retrieved
 	 * @return Allocation id of the specified slot if allocated; otherwise null
 	 */
-	public AllocationID getCurrentAllocation(int index) {
-		TaskSlot<T> taskSlot = taskSlots.get(index);
-		if (taskSlot == null) {
-			return null;
-		}
-		return taskSlot.getAllocationId();
-	}
+	AllocationID getCurrentAllocation(int index);
 
 	/**
 	 * Get the memory manager of the slot allocated for the task.
@@ -670,217 +264,5 @@
 	 * @param allocationID allocation id of the slot allocated for the task
 	 * @return the memory manager of the slot allocated for the task
 	 */
-	public MemoryManager getTaskMemoryManager(AllocationID allocationID) throws SlotNotFoundException {
-		TaskSlot<T> taskSlot = getTaskSlot(allocationID);
-		if (taskSlot != null) {
-			return taskSlot.getMemoryManager();
-		} else {
-			throw new SlotNotFoundException(allocationID);
-		}
-	}
-
-	// ---------------------------------------------------------------------
-	// TimeoutListener methods
-	// ---------------------------------------------------------------------
-
-	@Override
-	public void notifyTimeout(AllocationID key, UUID ticket) {
-		checkInit();
-
-		if (slotActions != null) {
-			slotActions.timeoutSlot(key, ticket);
-		}
-	}
-
-	// ---------------------------------------------------------------------
-	// Internal methods
-	// ---------------------------------------------------------------------
-
-	@Nullable
-	private TaskSlot<T> getTaskSlot(AllocationID allocationId) {
-		Preconditions.checkNotNull(allocationId);
-
-		return allocatedSlots.get(allocationId);
-	}
-
-	private void checkInit() {
-		Preconditions.checkState(started, "The %s has to be started.", TaskSlotTable.class.getSimpleName());
-	}
-
-	// ---------------------------------------------------------------------
-	// Static utility classes
-	// ---------------------------------------------------------------------
-
-	/**
-	 * Mapping class between a {@link TaskSlotPayload} and a {@link TaskSlot}.
-	 */
-	private static final class TaskSlotMapping<T extends TaskSlotPayload> {
-		private final T task;
-		private final TaskSlot<T> taskSlot;
-
-		private TaskSlotMapping(T task, TaskSlot<T> taskSlot) {
-			this.task = Preconditions.checkNotNull(task);
-			this.taskSlot = Preconditions.checkNotNull(taskSlot);
-		}
-
-		public T getTask() {
-			return task;
-		}
-
-		public TaskSlot<T> getTaskSlot() {
-			return taskSlot;
-		}
-	}
-
-	/**
-	 * Iterator over {@link AllocationID} of the {@link TaskSlot} of a given job. Additionally,
-	 * the task slots identified by the allocation ids are in the given state.
-	 */
-	private final class AllocationIDIterator implements Iterator<AllocationID> {
-		private final Iterator<TaskSlot<T>> iterator;
-
-		private AllocationIDIterator(JobID jobId, TaskSlotState state) {
-			iterator = new TaskSlotIterator(jobId, state);
-		}
-
-		@Override
-		public boolean hasNext() {
-			return iterator.hasNext();
-		}
-
-		@Override
-		public AllocationID next() {
-			try {
-				return iterator.next().getAllocationId();
-			} catch (NoSuchElementException e) {
-				throw new NoSuchElementException("No more allocation ids.");
-			}
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException("Cannot remove allocation ids via this iterator.");
-		}
-	}
-
-	/**
-	 * Iterator over {@link TaskSlot} which fulfill a given state condition and belong to the given
-	 * job.
-	 */
-	private final class TaskSlotIterator implements Iterator<TaskSlot<T>> {
-		private final Iterator<AllocationID> allSlots;
-		private final TaskSlotState state;
-
-		private TaskSlot<T> currentSlot;
-
-		private TaskSlotIterator(JobID jobId, TaskSlotState state) {
-
-			Set<AllocationID> allocationIds = slotsPerJob.get(jobId);
-
-			if (allocationIds == null || allocationIds.isEmpty()) {
-				allSlots = Collections.emptyIterator();
-			} else {
-				allSlots = allocationIds.iterator();
-			}
-
-			this.state = Preconditions.checkNotNull(state);
-
-			this.currentSlot = null;
-		}
-
-		@Override
-		public boolean hasNext() {
-			while (currentSlot == null && allSlots.hasNext()) {
-				AllocationID tempSlot = allSlots.next();
-
-				TaskSlot<T> taskSlot = getTaskSlot(tempSlot);
-
-				if (taskSlot != null && taskSlot.getState() == state) {
-					currentSlot = taskSlot;
-				}
-			}
-
-			return currentSlot != null;
-		}
-
-		@Override
-		public TaskSlot<T> next() {
-			if (currentSlot != null) {
-				TaskSlot<T> result = currentSlot;
-
-				currentSlot = null;
-
-				return result;
-			} else {
-				while (true) {
-					AllocationID tempSlot;
-
-					try {
-						tempSlot = allSlots.next();
-					} catch (NoSuchElementException e) {
-						throw new NoSuchElementException("No more task slots.");
-					}
-
-					TaskSlot<T> taskSlot = getTaskSlot(tempSlot);
-
-					if (taskSlot != null && taskSlot.getState() == state) {
-						return taskSlot;
-					}
-				}
-			}
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException("Cannot remove task slots via this iterator.");
-		}
-	}
-
-	/**
-	 * Iterator over all {@link TaskSlotPayload} for a given job.
-	 */
-	private final class TaskIterator implements Iterator<T> {
-		private final Iterator<TaskSlot<T>> taskSlotIterator;
-
-		private Iterator<T> currentTasks;
-
-		private TaskIterator(JobID jobId) {
-			this.taskSlotIterator = new TaskSlotIterator(jobId, TaskSlotState.ACTIVE);
-
-			this.currentTasks = null;
-		}
-
-		@Override
-		public boolean hasNext() {
-			while ((currentTasks == null || !currentTasks.hasNext()) && taskSlotIterator.hasNext()) {
-				TaskSlot<T> taskSlot = taskSlotIterator.next();
-
-				currentTasks = taskSlot.getTasks();
-			}
-
-			return (currentTasks != null && currentTasks.hasNext());
-		}
-
-		@Override
-		public T next() {
-			while ((currentTasks == null || !currentTasks.hasNext())) {
-				TaskSlot<T> taskSlot;
-
-				try {
-					taskSlot = taskSlotIterator.next();
-				} catch (NoSuchElementException e) {
-					throw new NoSuchElementException("No more tasks.");
-				}
-
-				currentTasks = taskSlot.getTasks();
-			}
-
-			return currentTasks.next();
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException("Cannot remove tasks via this iterator.");
-		}
-	}
+	MemoryManager getTaskMemoryManager(AllocationID allocationID) throws SlotNotFoundException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
new file mode 100644
index 0000000..873c1ec
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
@@ -0,0 +1,773 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceBudgetManager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor.DummyComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link TaskSlotTable}.
+ */
+public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTable<T> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(TaskSlotTableImpl.class);
+
+	/**
+	 * Number of slots in static slot allocation.
+	 * If slot is requested with an index, the requested index must within the range of [0, numberSlots).
+	 * When generating slot report, we should always generate slots with index in [0, numberSlots) even the slot does not exist.
+	 */
+	private final int numberSlots;
+
+	/** Slot resource profile for static slot allocation. */
+	private final ResourceProfile defaultSlotResourceProfile;
+
+	/** Page size for memory manager. */
+	private final int memoryPageSize;
+
+	/** Timer service used to time out allocated slots. */
+	private final TimerService<AllocationID> timerService;
+
+	/** The list of all task slots. */
+	private final Map<Integer, TaskSlot<T>> taskSlots;
+
+	/** Mapping from allocation id to task slot. */
+	private final Map<AllocationID, TaskSlot<T>> allocatedSlots;
+
+	/** Mapping from execution attempt id to task and task slot. */
+	private final Map<ExecutionAttemptID, TaskSlotMapping<T>> taskSlotMappings;
+
+	/** Mapping from job id to allocated slots for a job. */
+	private final Map<JobID, Set<AllocationID>> slotsPerJob;
+
+	/** Interface for slot actions, such as freeing them or timing them out. */
+	@Nullable
+	private SlotActions slotActions;
+
+	/** The table state. */
+	private volatile State state;
+
+	private final ResourceBudgetManager budgetManager;
+
+	/** The closing future is completed when all slot are freed and state is closed. */
+	private final CompletableFuture<Void> closingFuture;
+
+	/** {@link ComponentMainThreadExecutor} to schedule internal calls to the main thread. */
+	private ComponentMainThreadExecutor mainThreadExecutor = new DummyComponentMainThreadExecutor(
+		"TaskSlotTableImpl is not initialized with proper main thread executor, " +
+			"call to TaskSlotTableImpl#start is required");
+
+	public TaskSlotTableImpl(
+			final int numberSlots,
+			final ResourceProfile totalAvailableResourceProfile,
+			final ResourceProfile defaultSlotResourceProfile,
+			final int memoryPageSize,
+			final TimerService<AllocationID> timerService) {
+		Preconditions.checkArgument(0 < numberSlots, "The number of task slots must be greater than 0.");
+
+		this.numberSlots = numberSlots;
+		this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile);
+		this.memoryPageSize = memoryPageSize;
+
+		this.taskSlots = new HashMap<>(numberSlots);
+
+		this.timerService = Preconditions.checkNotNull(timerService);
+
+		budgetManager = new ResourceBudgetManager(Preconditions.checkNotNull(totalAvailableResourceProfile));
+
+		allocatedSlots = new HashMap<>(numberSlots);
+
+		taskSlotMappings = new HashMap<>(4 * numberSlots);
+
+		slotsPerJob = new HashMap<>(4);
+
+		slotActions = null;
+		state = State.CREATED;
+		closingFuture = new CompletableFuture<>();
+	}
+
+	@Override
+	public void start(SlotActions initialSlotActions, ComponentMainThreadExecutor mainThreadExecutor) {
+		Preconditions.checkState(
+			state == State.CREATED,
+			"The %s has to be just created before starting",
+			TaskSlotTableImpl.class.getSimpleName());
+		this.slotActions = Preconditions.checkNotNull(initialSlotActions);
+		this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor);
+
+		timerService.start(this);
+
+		state = State.RUNNING;
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		if (state == State.CREATED) {
+			state = State.CLOSED;
+			closingFuture.complete(null);
+		} else if (state == State.RUNNING) {
+			state = State.CLOSING;
+			final FlinkException cause = new FlinkException("Closing task slot table");
+			CompletableFuture<Void> cleanupFuture = FutureUtils
+				.waitForAll(
+					new ArrayList<>(allocatedSlots.values())
+						.stream()
+						.map(slot -> freeSlotInternal(slot, cause))
+						.collect(Collectors.toList()))
+				.thenRunAsync(
+					() -> {
+						state = State.CLOSED;
+						timerService.stop();
+					},
+					mainThreadExecutor);
+			FutureUtils.forward(cleanupFuture, closingFuture);
+		}
+		return closingFuture;
+	}
+
+	@VisibleForTesting
+	public boolean isClosed() {
+		return state == State.CLOSED;
+	}
+
+	@Override
+	public Set<AllocationID> getAllocationIdsPerJob(JobID jobId) {
+		final Set<AllocationID> allocationIds = slotsPerJob.get(jobId);
+
+		if (allocationIds == null) {
+			return Collections.emptySet();
+		} else {
+			return Collections.unmodifiableSet(allocationIds);
+		}
+	}
+
+	// ---------------------------------------------------------------------
+	// Slot report methods
+	// ---------------------------------------------------------------------
+
+	@Override
+	public SlotReport createSlotReport(ResourceID resourceId) {
+		List<SlotStatus> slotStatuses = new ArrayList<>();
+
+		for (int i = 0; i < numberSlots; i++) {
+			SlotID slotId = new SlotID(resourceId, i);
+			SlotStatus slotStatus;
+			if (taskSlots.containsKey(i)) {
+				TaskSlot<T> taskSlot = taskSlots.get(i);
+
+				slotStatus = new SlotStatus(
+					slotId,
+					taskSlot.getResourceProfile(),
+					taskSlot.getJobId(),
+					taskSlot.getAllocationId());
+			} else {
+				slotStatus = new SlotStatus(
+					slotId,
+					defaultSlotResourceProfile,
+					null,
+					null);
+			}
+
+			slotStatuses.add(slotStatus);
+		}
+
+		for (TaskSlot<T> taskSlot : allocatedSlots.values()) {
+			if (taskSlot.getIndex() < 0) {
+				SlotID slotID = SlotID.generateDynamicSlotID(resourceId);
+				SlotStatus slotStatus = new SlotStatus(
+					slotID,
+					taskSlot.getResourceProfile(),
+					taskSlot.getJobId(),
+					taskSlot.getAllocationId());
+				slotStatuses.add(slotStatus);
+			}
+		}
+
+		final SlotReport slotReport = new SlotReport(slotStatuses);
+
+		return slotReport;
+	}
+
+	// ---------------------------------------------------------------------
+	// Slot methods
+	// ---------------------------------------------------------------------
+
+	@VisibleForTesting
+	@Override
+	public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
+		return allocateSlot(index, jobId, allocationId, defaultSlotResourceProfile, slotTimeout);
+	}
+
+	@Override
+	public boolean allocateSlot(
+			int index,
+			JobID jobId,
+			AllocationID allocationId,
+			ResourceProfile resourceProfile,
+			Time slotTimeout) {
+		checkRunning();
+
+		Preconditions.checkArgument(index < numberSlots);
+
+		TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
+		if (taskSlot != null) {
+			LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot);
+			return false;
+		}
+
+		if (taskSlots.containsKey(index)) {
+			TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index);
+			LOG.info("Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.",
+				index,
+				duplicatedTaskSlot.getResourceProfile(),
+				duplicatedTaskSlot.getJobId(),
+				duplicatedTaskSlot.getAllocationId());
+			return duplicatedTaskSlot.getJobId().equals(jobId) &&
+				duplicatedTaskSlot.getAllocationId().equals(allocationId);
+		} else if (allocatedSlots.containsKey(allocationId)) {
+			return true;
+		}
+
+		resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile;
+
+		if (!budgetManager.reserve(resourceProfile)) {
+			LOG.info("Cannot allocate the requested resources. Trying to allocate {}, "
+					+ "while the currently remaining available resources are {}, total is {}.",
+				resourceProfile,
+				budgetManager.getAvailableBudget(),
+				budgetManager.getTotalBudget());
+			return false;
+		}
+
+		taskSlot = new TaskSlot<>(index, resourceProfile, memoryPageSize, jobId, allocationId);
+		if (index >= 0) {
+			taskSlots.put(index, taskSlot);
+		}
+
+		// update the allocation id to task slot map
+		allocatedSlots.put(allocationId, taskSlot);
+
+		// register a timeout for this slot since it's in state allocated
+		timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
+
+		// add this slot to the set of job slots
+		Set<AllocationID> slots = slotsPerJob.get(jobId);
+
+		if (slots == null) {
+			slots = new HashSet<>(4);
+			slotsPerJob.put(jobId, slots);
+		}
+
+		slots.add(allocationId);
+
+		return true;
+	}
+
+	@Override
+	public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException {
+		checkRunning();
+
+		TaskSlot<T> taskSlot = getTaskSlot(allocationId);
+
+		if (taskSlot != null) {
+			if (taskSlot.markActive()) {
+				// unregister a potential timeout
+				LOG.info("Activate slot {}.", allocationId);
+
+				timerService.unregisterTimeout(allocationId);
+
+				return true;
+			} else {
+				return false;
+			}
+		} else {
+			throw new SlotNotFoundException(allocationId);
+		}
+	}
+
+	@Override
+	public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) throws SlotNotFoundException {
+		checkStarted();
+
+		TaskSlot<T> taskSlot = getTaskSlot(allocationId);
+
+		if (taskSlot != null) {
+			if (taskSlot.markInactive()) {
+				// register a timeout to free the slot
+				timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
+
+				return true;
+			} else {
+				return false;
+			}
+		} else {
+			throw new SlotNotFoundException(allocationId);
+		}
+	}
+
+	@Override
+	public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
+		checkStarted();
+
+		TaskSlot<T> taskSlot = getTaskSlot(allocationId);
+
+		if (taskSlot != null) {
+			return freeSlotInternal(taskSlot, cause).isDone() ? taskSlot.getIndex() : -1;
+		} else {
+			throw new SlotNotFoundException(allocationId);
+		}
+	}
+
+	private CompletableFuture<Void> freeSlotInternal(TaskSlot<T> taskSlot, Throwable cause) {
+		AllocationID allocationId = taskSlot.getAllocationId();
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Free slot {}.", taskSlot, cause);
+		} else {
+			LOG.info("Free slot {}.", taskSlot);
+		}
+
+		if (taskSlot.isEmpty()) {
+			// remove the allocation id to task slot mapping
+			allocatedSlots.remove(allocationId);
+
+			// unregister a potential timeout
+			timerService.unregisterTimeout(allocationId);
+
+			JobID jobId = taskSlot.getJobId();
+			Set<AllocationID> slots = slotsPerJob.get(jobId);
+
+			if (slots == null) {
+				throw new IllegalStateException("There are no more slots allocated for the job " + jobId +
+					". This indicates a programming bug.");
+			}
+
+			slots.remove(allocationId);
+
+			if (slots.isEmpty()) {
+				slotsPerJob.remove(jobId);
+			}
+
+			taskSlots.remove(taskSlot.getIndex());
+			budgetManager.release(taskSlot.getResourceProfile());
+		}
+		return taskSlot.closeAsync(cause);
+	}
+
+	@Override
+	public boolean isValidTimeout(AllocationID allocationId, UUID ticket) {
+		checkStarted();
+
+		return state == State.RUNNING && timerService.isValid(allocationId, ticket);
+	}
+
+	@Override
+	public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
+		TaskSlot<T> taskSlot = taskSlots.get(index);
+		if (taskSlot != null) {
+			return taskSlot.isAllocated(jobId, allocationId);
+		} else if (index < 0) {
+			return allocatedSlots.containsKey(allocationId);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) {
+		TaskSlot<T> taskSlot = getTaskSlot(allocationId);
+
+		if (taskSlot != null && taskSlot.isAllocated(jobId, allocationId)) {
+			return taskSlot.markActive();
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean isSlotFree(int index) {
+		return !taskSlots.containsKey(index);
+	}
+
+	@Override
+	public boolean hasAllocatedSlots(JobID jobId) {
+		return getAllocatedSlots(jobId).hasNext();
+	}
+
+	@Override
+	public Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId) {
+		return new TaskSlotIterator(jobId, TaskSlotState.ALLOCATED);
+	}
+
+	@Override
+	public Iterator<AllocationID> getActiveSlots(JobID jobId) {
+		return new AllocationIDIterator(jobId, TaskSlotState.ACTIVE);
+	}
+
+	@Override
+	@Nullable
+	public JobID getOwningJob(AllocationID allocationId) {
+		final TaskSlot<T> taskSlot = getTaskSlot(allocationId);
+
+		if (taskSlot != null) {
+			return taskSlot.getJobId();
+		} else {
+			return null;
+		}
+	}
+
+	// ---------------------------------------------------------------------
+	// Task methods
+	// ---------------------------------------------------------------------
+
+	@Override
+	public boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException {
+		checkRunning();
+		Preconditions.checkNotNull(task);
+
+		TaskSlot<T> taskSlot = getTaskSlot(task.getAllocationId());
+
+		if (taskSlot != null) {
+			if (taskSlot.isActive(task.getJobID(), task.getAllocationId())) {
+				if (taskSlot.add(task)) {
+					taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping<>(task, taskSlot));
+
+					return true;
+				} else {
+					return false;
+				}
+			} else {
+				throw new SlotNotActiveException(task.getJobID(), task.getAllocationId());
+			}
+		} else {
+			throw new SlotNotFoundException(task.getAllocationId());
+		}
+	}
+
+	@Override
+	public T removeTask(ExecutionAttemptID executionAttemptID) {
+		checkStarted();
+
+		TaskSlotMapping<T> taskSlotMapping = taskSlotMappings.remove(executionAttemptID);
+
+		if (taskSlotMapping != null) {
+			T task = taskSlotMapping.getTask();
+			TaskSlot<T> taskSlot = taskSlotMapping.getTaskSlot();
+
+			taskSlot.remove(task.getExecutionId());
+
+			if (taskSlot.isReleasing() && taskSlot.isEmpty()) {
+				slotActions.freeSlot(taskSlot.getAllocationId());
+			}
+
+			return task;
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	public T getTask(ExecutionAttemptID executionAttemptID) {
+		TaskSlotMapping<T> taskSlotMapping = taskSlotMappings.get(executionAttemptID);
+
+		if (taskSlotMapping != null) {
+			return taskSlotMapping.getTask();
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	public Iterator<T> getTasks(JobID jobId) {
+		return new PayloadIterator(jobId);
+	}
+
+	@Override
+	public AllocationID getCurrentAllocation(int index) {
+		TaskSlot<T> taskSlot = taskSlots.get(index);
+		if (taskSlot == null) {
+			return null;
+		}
+		return taskSlot.getAllocationId();
+	}
+
+	@Override
+	public MemoryManager getTaskMemoryManager(AllocationID allocationID) throws SlotNotFoundException {
+		TaskSlot<T> taskSlot = getTaskSlot(allocationID);
+		if (taskSlot != null) {
+			return taskSlot.getMemoryManager();
+		} else {
+			throw new SlotNotFoundException(allocationID);
+		}
+	}
+
+	// ---------------------------------------------------------------------
+	// TimeoutListener methods
+	// ---------------------------------------------------------------------
+
+	@Override
+	public void notifyTimeout(AllocationID key, UUID ticket) {
+		checkStarted();
+
+		if (slotActions != null) {
+			slotActions.timeoutSlot(key, ticket);
+		}
+	}
+
+	// ---------------------------------------------------------------------
+	// Internal methods
+	// ---------------------------------------------------------------------
+
+	@Nullable
+	private TaskSlot<T> getTaskSlot(AllocationID allocationId) {
+		Preconditions.checkNotNull(allocationId);
+
+		return allocatedSlots.get(allocationId);
+	}
+
+	private void checkRunning() {
+		Preconditions.checkState(
+			state == State.RUNNING,
+			"The %s has to be running.", TaskSlotTableImpl.class.getSimpleName());
+	}
+
+	private void checkStarted() {
+		Preconditions.checkState(
+			state != State.CREATED,
+			"The %s has to be started (not created).", TaskSlotTableImpl.class.getSimpleName());
+	}
+
+	// ---------------------------------------------------------------------
+	// Static utility classes
+	// ---------------------------------------------------------------------
+
+	/**
+	 * Mapping class between a {@link TaskSlotPayload} and a {@link TaskSlot}.
+	 */
+	private static final class TaskSlotMapping<T extends TaskSlotPayload> {
+		private final T task;
+		private final TaskSlot<T> taskSlot;
+
+		private TaskSlotMapping(T task, TaskSlot<T> taskSlot) {
+			this.task = Preconditions.checkNotNull(task);
+			this.taskSlot = Preconditions.checkNotNull(taskSlot);
+		}
+
+		public T getTask() {
+			return task;
+		}
+
+		public TaskSlot<T> getTaskSlot() {
+			return taskSlot;
+		}
+	}
+
+	/**
+	 * Iterator over {@link AllocationID} of the {@link TaskSlot} of a given job. Additionally,
+	 * the task slots identified by the allocation ids are in the given state.
+	 */
+	private final class AllocationIDIterator implements Iterator<AllocationID> {
+		private final Iterator<TaskSlot<T>> iterator;
+
+		private AllocationIDIterator(JobID jobId, TaskSlotState state) {
+			iterator = new TaskSlotIterator(jobId, state);
+		}
+
+		@Override
+		public boolean hasNext() {
+			return iterator.hasNext();
+		}
+
+		@Override
+		public AllocationID next() {
+			try {
+				return iterator.next().getAllocationId();
+			} catch (NoSuchElementException e) {
+				throw new NoSuchElementException("No more allocation ids.");
+			}
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("Cannot remove allocation ids via this iterator.");
+		}
+	}
+
+	/**
+	 * Iterator over {@link TaskSlot} which fulfill a given state condition and belong to the given
+	 * job.
+	 */
+	private final class TaskSlotIterator implements Iterator<TaskSlot<T>> {
+		private final Iterator<AllocationID> allSlots;
+		private final TaskSlotState state;
+
+		private TaskSlot<T> currentSlot;
+
+		private TaskSlotIterator(JobID jobId, TaskSlotState state) {
+
+			Set<AllocationID> allocationIds = slotsPerJob.get(jobId);
+
+			if (allocationIds == null || allocationIds.isEmpty()) {
+				allSlots = Collections.emptyIterator();
+			} else {
+				allSlots = allocationIds.iterator();
+			}
+
+			this.state = Preconditions.checkNotNull(state);
+
+			this.currentSlot = null;
+		}
+
+		@Override
+		public boolean hasNext() {
+			while (currentSlot == null && allSlots.hasNext()) {
+				AllocationID tempSlot = allSlots.next();
+
+				TaskSlot<T> taskSlot = getTaskSlot(tempSlot);
+
+				if (taskSlot != null && taskSlot.getState() == state) {
+					currentSlot = taskSlot;
+				}
+			}
+
+			return currentSlot != null;
+		}
+
+		@Override
+		public TaskSlot<T> next() {
+			if (currentSlot != null) {
+				TaskSlot<T> result = currentSlot;
+
+				currentSlot = null;
+
+				return result;
+			} else {
+				while (true) {
+					AllocationID tempSlot;
+
+					try {
+						tempSlot = allSlots.next();
+					} catch (NoSuchElementException e) {
+						throw new NoSuchElementException("No more task slots.");
+					}
+
+					TaskSlot<T> taskSlot = getTaskSlot(tempSlot);
+
+					if (taskSlot != null && taskSlot.getState() == state) {
+						return taskSlot;
+					}
+				}
+			}
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("Cannot remove task slots via this iterator.");
+		}
+	}
+
+	/**
+	 * Iterator over all {@link TaskSlotPayload} for a given job.
+	 */
+	private final class PayloadIterator implements Iterator<T> {
+		private final Iterator<TaskSlot<T>> taskSlotIterator;
+
+		private Iterator<T> currentTasks;
+
+		private PayloadIterator(JobID jobId) {
+			this.taskSlotIterator = new TaskSlotIterator(jobId, TaskSlotState.ACTIVE);
+
+			this.currentTasks = null;
+		}
+
+		@Override
+		public boolean hasNext() {
+			while ((currentTasks == null || !currentTasks.hasNext()) && taskSlotIterator.hasNext()) {
+				TaskSlot<T> taskSlot = taskSlotIterator.next();
+
+				currentTasks = taskSlot.getTasks();
+			}
+
+			return (currentTasks != null && currentTasks.hasNext());
+		}
+
+		@Override
+		public T next() {
+			while ((currentTasks == null || !currentTasks.hasNext())) {
+				TaskSlot<T> taskSlot;
+
+				try {
+					taskSlot = taskSlotIterator.next();
+				} catch (NoSuchElementException e) {
+					throw new NoSuchElementException("No more tasks.");
+				}
+
+				currentTasks = taskSlot.getTasks();
+			}
+
+			return currentTasks.next();
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("Cannot remove tasks via this iterator.");
+		}
+	}
+
+	private enum State {
+		CREATED,
+		RUNNING,
+		CLOSING,
+		CLOSED
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
index cdde9be..e82e9b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
@@ -33,17 +33,13 @@
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
-import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.messages.TaskBackPressureResponse;
-import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -55,10 +51,15 @@
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
@@ -141,7 +142,7 @@
 
 		final ExecutionAttemptID eid = new ExecutionAttemptID();
 
-		final TaskDeploymentDescriptor tdd = createTestTaskDeploymentDescriptor("test task", eid, TaskExecutorTest.TestInvokable.class);
+		final TaskDeploymentDescriptor tdd = createTestTaskDeploymentDescriptor("test task", eid, FutureCompletingInvokable.class);
 
 		final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
 
@@ -819,4 +820,21 @@
 			producedPartitions,
 			inputGates);
 	}
+
+	/**
+	 * Test invokable which completes the given future when executed.
+	 */
+	public static class FutureCompletingInvokable extends AbstractInvokable {
+
+		static final CompletableFuture<Boolean> COMPLETABLE_FUTURE = new CompletableFuture<>();
+
+		public FutureCompletingInvokable(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			COMPLETABLE_FUTURE.complete(true);
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 423e401..4094be7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -43,8 +43,6 @@
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorBuilder;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.execution.librarycache.ContextClassLoaderLibraryCacheManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatListener;
 import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
@@ -57,7 +55,6 @@
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
-import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl;
@@ -67,7 +64,6 @@
 import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
@@ -98,14 +94,12 @@
 import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
+import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
@@ -282,7 +276,7 @@
 
 	@Test
 	public void testShouldShutDownTaskManagerServicesInPostStop() throws Exception {
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
+		final TaskSlotTableImpl<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
 
 		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
 
@@ -316,7 +310,7 @@
 			RpcUtils.terminateRpcEndpoint(taskManager, timeout);
 		}
 
-		assertThat(taskSlotTable.isStopped(), is(true));
+		assertThat(taskSlotTable.isClosed(), is(true));
 		assertThat(nettyShuffleEnvironment.isClosed(), is(true));
 		assertThat(kvStateService.isShutdown(), is(true));
 	}
@@ -429,9 +423,11 @@
 
 		rpc.registerGateway(rmAddress, rmGateway);
 
-		final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
-		final SlotReport slotReport = new SlotReport();
-		when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
+		final TaskSlotTable<Task> taskSlotTable = TestingTaskSlotTable
+			.<Task>newBuilder()
+			.createSlotReportSupplier(SlotReport::new)
+			.closeAsyncReturns(CompletableFuture.completedFuture(null))
+			.build();
 
 		HeartbeatServices heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
 
@@ -512,7 +508,12 @@
 				new JobID(),
 				new AllocationID()));
 
-		final TestingTaskSlotTable taskSlotTable = new TestingTaskSlotTable(new ArrayDeque<>(Arrays.asList(slotReport1, slotReport2)));
+		final Queue<SlotReport> reports = new ArrayDeque<>(Arrays.asList(slotReport1, slotReport2));
+		final TaskSlotTable<Task> taskSlotTable = TestingTaskSlotTable
+			.<Task>newBuilder()
+			.createSlotReportSupplier(reports::poll)
+			.closeAsyncReturns(CompletableFuture.completedFuture(null))
+			.build();
 
 		final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
 
@@ -632,9 +633,11 @@
 		rpc.registerGateway(address1, rmGateway1);
 		rpc.registerGateway(address2, rmGateway2);
 
-		final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
-		final SlotReport slotReport = new SlotReport();
-		when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
+		final TaskSlotTable<Task> taskSlotTable = TestingTaskSlotTable
+			.<Task>newBuilder()
+			.createSlotReportSupplier(SlotReport::new)
+			.closeAsyncReturns(CompletableFuture.completedFuture(null))
+			.build();
 
 		final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
 
@@ -680,123 +683,26 @@
 		}
 	}
 
-	/**
-	 * Tests that we can submit a task to the TaskManager given that we've allocated a slot there.
-	 */
-	@Test(timeout = 10000L)
-	public void testTaskSubmission() throws Exception {
-		final JobMasterId jobMasterId = JobMasterId.generate();
-		final AllocationID allocationId = new AllocationID();
-		final TaskDeploymentDescriptor taskDeploymentDescriptor = TaskDeploymentDescriptorBuilder
-			.newBuilder(jobId, TestInvokable.class)
-			.setAllocationId(allocationId)
-			.build();
-
-		final OneShotLatch taskInTerminalState = new OneShotLatch();
-		final TaskManagerActions taskManagerActions = createTaskManagerActionsWithTerminalStateTrigger(taskInTerminalState);
-		final JobManagerTable jobManagerTable = createJobManagerTableWithOneJob(jobMasterId, taskManagerActions);
-		final TaskExecutor taskExecutor = createTaskExecutorWithJobManagerTable(jobManagerTable);
-
-		try {
-			taskExecutor.start();
-
-			final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
-			final JobMasterGateway jobMasterGateway = jobManagerTable.get(jobId).getJobManagerGateway();
-			requestSlotFromTaskExecutor(taskExecutorGateway, jobMasterGateway, allocationId);
-
-			taskExecutorGateway.submitTask(taskDeploymentDescriptor, jobMasterId, timeout);
-
-			CompletableFuture<Boolean> completionFuture = TestInvokable.COMPLETABLE_FUTURE;
-
-			completionFuture.get();
-
-			taskInTerminalState.await();
-		} finally {
-			RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
-		}
-	}
-
-	/**
-	 * Test invokable which completes the given future when executed.
-	 */
-	public static class TestInvokable extends AbstractInvokable {
-
-		static final CompletableFuture<Boolean> COMPLETABLE_FUTURE = new CompletableFuture<>();
-
-		public TestInvokable(Environment environment) {
-			super(environment);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			COMPLETABLE_FUTURE.complete(true);
-		}
-	}
-
 	@Test
-	public void testTaskInterruptionAndTerminationOnShutdown() throws Exception {
-		final JobMasterId jobMasterId = JobMasterId.generate();
-		final AllocationID allocationId = new AllocationID();
-		final TaskDeploymentDescriptor taskDeploymentDescriptor = TaskDeploymentDescriptorBuilder
-			.newBuilder(jobId, TestInterruptableInvokable.class)
-			.setAllocationId(allocationId)
-			.build();
-
-		final JobManagerTable jobManagerTable = createJobManagerTableWithOneJob(jobMasterId, new NoOpTaskManagerActions());
-		final TaskExecutor taskExecutor = createTaskExecutorWithJobManagerTable(jobManagerTable);
-
+	public void testTaskSlotTableTerminationOnShutdown() throws Exception {
+		CompletableFuture<Void> taskSlotTableClosingFuture = new CompletableFuture<>();
+		TaskExecutorTestingContext submissionContext = createTaskExecutorTestingContext(
+			TestingTaskSlotTable.<Task>newBuilder().closeAsyncReturns(taskSlotTableClosingFuture).build());
+		final CompletableFuture<Void> taskExecutorTerminationFuture;
 		try {
-			taskExecutor.start();
-
-			final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
-			final JobMasterGateway jobMasterGateway = jobManagerTable.get(jobId).getJobManagerGateway();
-			requestSlotFromTaskExecutor(taskExecutorGateway, jobMasterGateway, allocationId);
-
-			taskExecutorGateway.submitTask(taskDeploymentDescriptor, jobMasterId, timeout);
-
-			TestInterruptableInvokable.STARTED_FUTURE.get();
+			submissionContext.start();
 		} finally {
-			taskExecutor.closeAsync();
+			taskExecutorTerminationFuture = submissionContext.taskExecutor.closeAsync();
 		}
 
-		// check task has been interrupted
-		TestInterruptableInvokable.INTERRUPTED_FUTURE.get();
-
 		// check task executor is waiting for the task completion and has not terminated yet
-		final CompletableFuture<Void> taskExecutorTerminationFuture = taskExecutor.getTerminationFuture();
 		assertThat(taskExecutorTerminationFuture.isDone(), is(false));
 
-		// check task executor has exited after task completion
-		TestInterruptableInvokable.DONE_FUTURE.complete(null);
+		// check task executor has exited after task slot table termination
+		taskSlotTableClosingFuture.complete(null);
 		taskExecutorTerminationFuture.get();
 	}
 
-	private void requestSlotFromTaskExecutor(
-			TaskExecutorGateway taskExecutorGateway,
-			JobMasterGateway jobMasterGateway,
-			AllocationID allocationId) throws ExecutionException, InterruptedException {
-		final CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>> initialSlotReportFuture =
-			new CompletableFuture<>();
-		ResourceManagerId resourceManagerId = createAndRegisterResourceManager(initialSlotReportFuture);
-		initialSlotReportFuture.get();
-
-		taskExecutorGateway
-			.requestSlot(
-				new SlotID(ResourceID.generate(), 0),
-				jobId,
-				allocationId,
-				ResourceProfile.ZERO,
-				jobMasterGateway.getAddress(),
-				resourceManagerId,
-				timeout)
-			.get();
-
-		// now inform the task manager about the new job leader
-		jobManagerLeaderRetriever.notifyListener(
-			jobMasterGateway.getAddress(),
-			jobMasterGateway.getFencingToken().toUUID());
-	}
-
 	private ResourceManagerId createAndRegisterResourceManager(
 			CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>> initialSlotReportFuture) {
 		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
@@ -814,52 +720,6 @@
 		return resourceManagerGateway.getFencingToken();
 	}
 
-	private TaskExecutor createTaskExecutorWithJobManagerTable(JobManagerTable jobManagerTable) throws IOException {
-		final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
-		return createTaskExecutor(new TaskManagerServicesBuilder()
-			.setTaskSlotTable(TaskSlotUtils.createTaskSlotTable(1))
-			.setJobManagerTable(jobManagerTable)
-			.setTaskStateManager(localStateStoresManager)
-			.build());
-	}
-
-	private JobManagerTable createJobManagerTableWithOneJob(
-			JobMasterId jobMasterId,
-			TaskManagerActions taskManagerActions) {
-		final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder()
-			.setFencingTokenSupplier(() -> jobMasterId)
-			.setOfferSlotsFunction((resourceID, slotOffers) -> CompletableFuture.completedFuture(slotOffers))
-			.build();
-		rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
-
-		final JobManagerConnection jobManagerConnection = new JobManagerConnection(
-			jobId,
-			ResourceID.generate(),
-			jobMasterGateway,
-			taskManagerActions,
-			new TestCheckpointResponder(),
-			new TestGlobalAggregateManager(),
-			ContextClassLoaderLibraryCacheManager.INSTANCE,
-			new NoOpResultPartitionConsumableNotifier(),
-			(j, i, r) -> CompletableFuture.completedFuture(null));
-
-		final JobManagerTable jobManagerTable = new JobManagerTable();
-		jobManagerTable.put(jobId, jobManagerConnection);
-		return jobManagerTable;
-	}
-
-	private static TaskManagerActions createTaskManagerActionsWithTerminalStateTrigger(
-		final OneShotLatch taskInTerminalState) {
-		return new NoOpTaskManagerActions() {
-			@Override
-			public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
-				if (taskExecutionState.getExecutionState().isTerminal()) {
-					taskInTerminalState.trigger();
-				}
-			}
-		};
-	}
-
 	/**
 	 * Tests that a TaskManager detects a job leader for which it has reserved slots. Upon detecting
 	 * the job leader, it will offer all reserved slots to the JobManager.
@@ -1963,23 +1823,9 @@
 
 	@Test
 	public void testDynamicSlotAllocation() throws Exception {
-		final JobMasterId jobMasterId = JobMasterId.generate();
 		final AllocationID allocationId = new AllocationID();
-
-		final OneShotLatch taskInTerminalState = new OneShotLatch();
-		final TaskManagerActions taskManagerActions = createTaskManagerActionsWithTerminalStateTrigger(taskInTerminalState);
-		final JobManagerTable jobManagerTable = createJobManagerTableWithOneJob(jobMasterId, taskManagerActions);
-		final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
-		final TaskExecutor taskExecutor = createTaskExecutor(new TaskManagerServicesBuilder()
-			.setTaskSlotTable(taskSlotTable)
-			.setJobManagerTable(jobManagerTable)
-			.build());
-
-		try {
-			taskExecutor.start();
-
-			final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
-			final JobMasterGateway jobMasterGateway = jobManagerTable.get(jobId).getJobManagerGateway();
+		try (TaskExecutorTestingContext submissionContext = createTaskExecutorTestingContext(2)) {
+			submissionContext.start();
 			final CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>> initialSlotReportFuture =
 				new CompletableFuture<>();
 			ResourceManagerId resourceManagerId = createAndRegisterResourceManager(initialSlotReportFuture);
@@ -1987,25 +1833,24 @@
 			final ResourceProfile resourceProfile = DEFAULT_RESOURCE_PROFILE
 				.merge(ResourceProfile.newBuilder().setCpuCores(0.1).build());
 
-			taskExecutorGateway
+			submissionContext.taskExecutor
+				.getSelfGateway(TaskExecutorGateway.class)
 				.requestSlot(
 					SlotID.generateDynamicSlotID(ResourceID.generate()),
 					jobId,
 					allocationId,
 					resourceProfile,
-					jobMasterGateway.getAddress(),
+					submissionContext.jobMasterGateway.getAddress(),
 					resourceManagerId,
 					timeout)
 				.get();
 
 			ResourceID resourceId = ResourceID.generate();
-			SlotReport slotReport = taskSlotTable.createSlotReport(resourceId);
+			SlotReport slotReport = submissionContext.taskSlotTable.createSlotReport(resourceId);
 			assertThat(slotReport, containsInAnyOrder(
-					new SlotStatus(new SlotID(resourceId, 0), DEFAULT_RESOURCE_PROFILE),
-					new SlotStatus(new SlotID(resourceId, 1), DEFAULT_RESOURCE_PROFILE),
-					new SlotStatus(SlotID.generateDynamicSlotID(resourceId), resourceProfile, jobId, allocationId)));
-		} finally {
-			RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+				new SlotStatus(new SlotID(resourceId, 0), DEFAULT_RESOURCE_PROFILE),
+				new SlotStatus(new SlotID(resourceId, 1), DEFAULT_RESOURCE_PROFILE),
+				new SlotStatus(SlotID.generateDynamicSlotID(resourceId), resourceProfile, jobId, allocationId)));
 		}
 	}
 
@@ -2070,6 +1915,95 @@
 			TaskManagerRunner.createBackPressureSampleService(configuration, rpc.getScheduledExecutor()));
 	}
 
+	private TaskExecutorTestingContext createTaskExecutorTestingContext(int numberOfSlots) throws IOException {
+		return createTaskExecutorTestingContext(TaskSlotUtils.createTaskSlotTable(numberOfSlots));
+	}
+
+	private TaskExecutorTestingContext createTaskExecutorTestingContext(final TaskSlotTable<Task> taskSlotTable) throws IOException {
+		final OneShotLatch offerSlotsLatch = new OneShotLatch();
+		final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder()
+			.setOfferSlotsFunction((resourceID, slotOffers) -> {
+				offerSlotsLatch.trigger();
+				return CompletableFuture.completedFuture(slotOffers);
+			}).build();
+		rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
+
+		final JobLeaderService jobLeaderService = new JobLeaderService(
+			taskManagerLocation,
+			RetryingRegistrationConfiguration.defaultConfiguration());
+
+		TaskExecutorLocalStateStoresManager stateStoresManager = createTaskExecutorLocalStateStoresManager();
+		final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(new TaskManagerServicesBuilder()
+			.setTaskSlotTable(taskSlotTable)
+			.setJobLeaderService(jobLeaderService)
+			.setTaskStateManager(stateStoresManager)
+			.build());
+
+		jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
+		return new TaskExecutorTestingContext(offerSlotsLatch, jobMasterGateway, jobLeaderService, taskSlotTable, taskExecutor);
+	}
+
+	private class TaskExecutorTestingContext implements AutoCloseable {
+		private final OneShotLatch offerSlotsLatch;
+		private final TestingJobMasterGateway jobMasterGateway;
+		private final JobLeaderService jobLeaderService;
+		private final TaskSlotTable taskSlotTable;
+		private final TestingTaskExecutor taskExecutor;
+
+		private TaskExecutorTestingContext(
+				OneShotLatch offerSlotsLatch,
+				TestingJobMasterGateway jobMasterGateway,
+				JobLeaderService jobLeaderService,
+				TaskSlotTable taskSlotTable,
+				TestingTaskExecutor taskExecutor) {
+			this.offerSlotsLatch = offerSlotsLatch;
+			this.jobMasterGateway = jobMasterGateway;
+			this.jobLeaderService = jobLeaderService;
+			this.taskSlotTable = taskSlotTable;
+			this.taskExecutor = taskExecutor;
+		}
+
+		private void start() {
+			taskExecutor.start();
+			taskExecutor.waitUntilStarted();
+		}
+
+		private void startAllocateSlotAndSubmit(
+			final Class<? extends AbstractInvokable> task) throws Exception {
+			final AllocationID allocationId = new AllocationID();
+
+			start();
+
+			taskSlotTable.allocateSlot(0, jobId, allocationId, Time.milliseconds(10000L));
+
+			// we have to add the job after the TaskExecutor, because otherwise the service has not
+			// been properly started.
+			jobLeaderService.addJob(jobId, jobMasterGateway.getAddress());
+			offerSlotsLatch.await();
+
+			taskExecutor
+				.getSelfGateway(TaskExecutorGateway.class)
+				.submitTask(
+					createTaskDeploymentDescriptor(allocationId, task),
+					jobMasterGateway.getFencingToken(),
+					timeout);
+		}
+
+		private TaskDeploymentDescriptor createTaskDeploymentDescriptor(
+			final AllocationID allocationId,
+			final Class<? extends AbstractInvokable> task) throws IOException {
+			return TaskDeploymentDescriptorBuilder
+				.newBuilder(jobId, task)
+				.setAllocationId(allocationId)
+				.build();
+		}
+
+		@Override
+		public void close() throws ExecutionException, InterruptedException, TimeoutException {
+			RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+		}
+	}
+
 	private static final class StartStopNotifyingLeaderRetrievalService implements LeaderRetrievalService {
 		private final CompletableFuture<LeaderRetrievalListener> startFuture;
 
@@ -2165,21 +2099,7 @@
 		}
 	}
 
-	private static final class TestingTaskSlotTable extends TaskSlotTable<Task> {
-		private final Queue<SlotReport> slotReports;
-
-		private TestingTaskSlotTable(Queue<SlotReport> slotReports) {
-			super(1, createTotalResourceProfile(1), DEFAULT_RESOURCE_PROFILE, MemoryManager.MIN_PAGE_SIZE, createDefaultTimerService(timeout.toMilliseconds()));
-			this.slotReports = slotReports;
-		}
-
-		@Override
-		public SlotReport createSlotReport(ResourceID resourceId) {
-			return slotReports.poll();
-		}
-	}
-
-	private static final class AllocateSlotNotifyingTaskSlotTable extends TaskSlotTable<Task> {
+	private static final class AllocateSlotNotifyingTaskSlotTable extends TaskSlotTableImpl<Task> {
 
 		private final OneShotLatch allocateSlotLatch;
 
@@ -2205,7 +2125,7 @@
 		}
 	}
 
-	private static final class ActivateSlotNotifyingTaskSlotTable extends TaskSlotTable<Task> {
+	private static final class ActivateSlotNotifyingTaskSlotTable extends TaskSlotTableImpl<Task> {
 
 		private final CountDownLatch slotsToActivate;
 
@@ -2225,36 +2145,4 @@
 			return result;
 		}
 	}
-
-	/**
-	 * Test invokable which completes the given future when interrupted (can be used only once).
-	 */
-	public static class TestInterruptableInvokable extends AbstractInvokable {
-		private static final CompletableFuture<Void> INTERRUPTED_FUTURE = new CompletableFuture<>();
-		private static final CompletableFuture<Void> STARTED_FUTURE = new CompletableFuture<>();
-		private static final CompletableFuture<Void> DONE_FUTURE = new CompletableFuture<>();
-
-		public TestInterruptableInvokable(Environment environment) {
-			super(environment);
-		}
-
-		@Override
-		public void invoke() {
-			STARTED_FUTURE.complete(null);
-
-			try {
-				INTERRUPTED_FUTURE.get();
-			} catch (InterruptedException e) {
-				INTERRUPTED_FUTURE.complete(null);
-			} catch (ExecutionException e) {
-				ExceptionUtils.rethrow(e);
-			}
-
-			try {
-				DONE_FUTURE.get();
-			} catch (ExecutionException | InterruptedException e) {
-				ExceptionUtils.rethrow(e);
-			}
-		}
-	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
index 8b348e4..c494ffd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
@@ -26,11 +26,14 @@
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
+import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.util.concurrent.CompletableFuture;
+
 import static org.mockito.Mockito.mock;
 
 /**
@@ -57,7 +60,7 @@
 		kvStateService = new KvStateService(new KvStateRegistry(), null, null);
 		broadcastVariableManager = new BroadcastVariableManager();
 		taskEventDispatcher = new TaskEventDispatcher();
-		taskSlotTable = (TaskSlotTable<Task>) mock(TaskSlotTable.class);
+		taskSlotTable = TestingTaskSlotTable.<Task>newBuilder().closeAsyncReturns(CompletableFuture.completedFuture(null)).build();
 		jobManagerTable = new JobManagerTable();
 		jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
 		taskStateManager = mock(TaskExecutorLocalStateStoresManager.class);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 1661efb..6d47155 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -51,6 +51,7 @@
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -76,7 +77,6 @@
 import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -119,14 +119,16 @@
 
 		this.jobMasterId = jobMasterId;
 
-		if (slotSize > 0) {
-			this.taskSlotTable = TaskSlotUtils.createTaskSlotTable(slotSize);
-		} else {
-			//noinspection unchecked
-			this.taskSlotTable = (TaskSlotTable<Task>) mock(TaskSlotTable.class);
-			when(taskSlotTable.tryMarkSlotActive(eq(jobId), any())).thenReturn(true);
-			when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);
-		}
+		this.taskSlotTable = slotSize > 0 ?
+			TaskSlotUtils.createTaskSlotTable(slotSize) :
+			TestingTaskSlotTable
+				.<Task>newBuilder()
+				.tryMarkSlotActiveReturns(true)
+				.addTaskReturns(true)
+				.closeAsyncReturns(CompletableFuture.completedFuture(null))
+				.allocateSlotReturns(true)
+				.memoryManagerGetterReturns(null)
+				.build();
 
 		JobMasterGateway jobMasterGateway;
 		if (testingJobMasterGateway == null) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
index 55a71d8..098c367 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
@@ -24,6 +24,8 @@
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.util.TestLogger;
@@ -38,6 +40,7 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
@@ -48,19 +51,16 @@
  * Tests for the {@link TaskSlotTable}.
  */
 public class TaskSlotTableTest extends TestLogger {
-
 	private static final Time SLOT_TIMEOUT = Time.seconds(100L);
 
 	/**
 	 * Tests that one can can mark allocated slots as active.
 	 */
 	@Test
-	public void testTryMarkSlotActive() throws SlotNotFoundException {
-		final TaskSlotTable<?> taskSlotTable = TaskSlotUtils.createTaskSlotTable(3);
+	public void testTryMarkSlotActive() throws Exception {
+		final TaskSlotTableImpl<?> taskSlotTable = createTaskSlotTableAndStart(3);
 
 		try {
-			taskSlotTable.start(new TestingSlotActionsBuilder().build());
-
 			final JobID jobId1 = new JobID();
 			final AllocationID allocationId1 = new AllocationID();
 			taskSlotTable.allocateSlot(0, jobId1, allocationId1, SLOT_TIMEOUT);
@@ -84,8 +84,8 @@
 
 			assertThat(Sets.newHashSet(taskSlotTable.getActiveSlots(jobId1)), is(equalTo(new HashSet<>(Arrays.asList(allocationId2, allocationId1)))));
 		} finally {
-			taskSlotTable.stop();
-			assertThat(taskSlotTable.isStopped(), is(true));
+			taskSlotTable.close();
+			assertThat(taskSlotTable.isClosed(), is(true));
 		}
 	}
 
@@ -93,12 +93,8 @@
 	 * Tests that redundant slot allocation with the same AllocationID to a different slot is rejected.
 	 */
 	@Test
-	public void testRedundantSlotAllocation() {
-		final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
-
-		try {
-			taskSlotTable.start(new TestingSlotActionsBuilder().build());
-
+	public void testRedundantSlotAllocation() throws Exception {
+		try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(2)) {
 			final JobID jobId = new JobID();
 			final AllocationID allocationId = new AllocationID();
 
@@ -111,18 +107,12 @@
 			Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
 			assertThat(allocatedSlots.next().getIndex(), is(0));
 			assertThat(allocatedSlots.hasNext(), is(false));
-		} finally {
-			taskSlotTable.stop();
 		}
 	}
 
 	@Test
-	public void testFreeSlot() throws SlotNotFoundException {
-		final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
-
-		try {
-			taskSlotTable.start(new TestingSlotActionsBuilder().build());
-
+	public void testFreeSlot() throws Exception {
+		try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(2)) {
 			final JobID jobId = new JobID();
 			final AllocationID allocationId1 = new AllocationID();
 			final AllocationID allocationId2 = new AllocationID();
@@ -138,18 +128,12 @@
 			assertThat(taskSlotTable.isAllocated(1, jobId, allocationId1), is(false));
 			assertThat(taskSlotTable.isAllocated(1, jobId, allocationId2), is(false));
 			assertThat(taskSlotTable.isSlotFree(1), is(true));
-		} finally {
-			taskSlotTable.stop();
 		}
 	}
 
 	@Test
-	public void testSlotAllocationWithDynamicSlotId() {
-		final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
-
-		try {
-			taskSlotTable.start(new TestingSlotActionsBuilder().build());
-
+	public void testSlotAllocationWithDynamicSlotId() throws Exception {
+		try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(2)) {
 			final JobID jobId = new JobID();
 			final AllocationID allocationId = new AllocationID();
 
@@ -159,18 +143,12 @@
 			assertThat(allocatedSlots.next().getIndex(), is(-1));
 			assertThat(allocatedSlots.hasNext(), is(false));
 			assertThat(taskSlotTable.isAllocated(-1, jobId, allocationId), is(true));
-		} finally {
-			taskSlotTable.stop();
 		}
 	}
 
 	@Test
-	public void testSlotAllocationWithResourceProfile() {
-		final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
-
-		try {
-			taskSlotTable.start(new TestingSlotActionsBuilder().build());
-
+	public void testSlotAllocationWithResourceProfile() throws Exception {
+		try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(2)) {
 			final JobID jobId = new JobID();
 			final AllocationID allocationId = new AllocationID();
 			final ResourceProfile resourceProfile = TaskSlotUtils.DEFAULT_RESOURCE_PROFILE
@@ -183,18 +161,12 @@
 			assertThat(allocatedSlot.getIndex(), is(-1));
 			assertThat(allocatedSlot.getResourceProfile(), is(resourceProfile));
 			assertThat(allocatedSlots.hasNext(), is(false));
-		} finally {
-			taskSlotTable.stop();
 		}
 	}
 
 	@Test
-	public void testSlotAllocationWithResourceProfileFailure() {
-		final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
-
-		try {
-			taskSlotTable.start(new TestingSlotActionsBuilder().build());
-
+	public void testSlotAllocationWithResourceProfileFailure() throws Exception {
+		try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(2)) {
 			final JobID jobId = new JobID();
 			final AllocationID allocationId = new AllocationID();
 			ResourceProfile resourceProfile = TaskSlotUtils.DEFAULT_RESOURCE_PROFILE;
@@ -204,18 +176,12 @@
 
 			Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
 			assertThat(allocatedSlots.hasNext(), is(false));
-		} finally {
-			taskSlotTable.stop();
 		}
 	}
 
 	@Test
-	public void testGenerateSlotReport() throws SlotNotFoundException {
-		final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(3);
-
-		try {
-			taskSlotTable.start(new TestingSlotActionsBuilder().build());
-
+	public void testGenerateSlotReport() throws Exception {
+		try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(3)) {
 			final JobID jobId = new JobID();
 			final AllocationID allocationId1 = new AllocationID();
 			final AllocationID allocationId2 = new AllocationID();
@@ -238,8 +204,113 @@
 				is(new SlotStatus(new SlotID(resourceId, 1), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, null, null)),
 				is(new SlotStatus(new SlotID(resourceId, 2), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, null, null)),
 				is(new SlotStatus(SlotID.generateDynamicSlotID(resourceId), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, jobId, allocationId3))));
-		} finally {
-			taskSlotTable.stop();
 		}
 	}
+
+	@Test
+	public void testAllocateSlot() throws Exception {
+		final JobID jobId = new JobID();
+		final AllocationID allocationId = new AllocationID();
+		try (final TaskSlotTable<TaskSlotPayload> taskSlotTable =
+				 createTaskSlotTableWithAllocatedSlot(jobId, allocationId, new TestingSlotActionsBuilder().build())) {
+			Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
+			TaskSlot<TaskSlotPayload> nextSlot = allocatedSlots.next();
+			assertThat(nextSlot.getIndex(), is(0));
+			assertThat(nextSlot.getAllocationId(), is(allocationId));
+			assertThat(nextSlot.getJobId(), is(jobId));
+			assertThat(allocatedSlots.hasNext(), is(false));
+		}
+	}
+
+	@Test
+	public void testAddTask() throws Exception {
+		final JobID jobId = new JobID();
+		final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+		final AllocationID allocationId = new AllocationID();
+		TaskSlotPayload task = new TestingTaskSlotPayload(jobId, executionAttemptId, allocationId).terminate();
+		try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableWithStartedTask(task)) {
+			Iterator<TaskSlotPayload> tasks = taskSlotTable.getTasks(jobId);
+			TaskSlotPayload nextTask = tasks.next();
+			assertThat(nextTask.getExecutionId(), is(executionAttemptId));
+			assertThat(nextTask.getAllocationId(), is(allocationId));
+			assertThat(tasks.hasNext(), is(false));
+		}
+	}
+
+	@Test(timeout = 10000)
+	public void testRemoveTaskCallsFreeSlotAction() throws Exception {
+		final JobID jobId = new JobID();
+		final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+		final AllocationID allocationId = new AllocationID();
+		CompletableFuture<AllocationID> freeSlotFuture = new CompletableFuture<>();
+		SlotActions slotActions = new TestingSlotActions(freeSlotFuture::complete, (aid, uid) -> {});
+		TaskSlotPayload task = new TestingTaskSlotPayload(jobId, executionAttemptId, allocationId).terminate();
+		try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableWithStartedTask(task, slotActions)) {
+			// we have to initiate closing of the slot externally
+			// to enable that the last remaining finished task does the final slot freeing
+			taskSlotTable.freeSlot(allocationId);
+			taskSlotTable.removeTask(executionAttemptId);
+			assertThat(freeSlotFuture.get(), is(allocationId));
+		}
+	}
+
+	@Test(timeout = 10000)
+	public void testFreeSlotInterruptsSubmittedTask() throws Exception {
+		TestingTaskSlotPayload task = new TestingTaskSlotPayload();
+		try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableWithStartedTask(task)) {
+			assertThat(taskSlotTable.freeSlot(task.getAllocationId()), is(-1));
+			task.waitForFailure();
+			task.terminate();
+		}
+	}
+
+	@Test(timeout = 10000)
+	public void testTableIsClosedOnlyWhenAllTasksTerminated() throws Exception {
+		TestingTaskSlotPayload task = new TestingTaskSlotPayload();
+		final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableWithStartedTask(task);
+		assertThat(taskSlotTable.freeSlot(task.getAllocationId()), is(-1));
+		CompletableFuture<Void> closingFuture = taskSlotTable.closeAsync();
+		assertThat(closingFuture.isDone(), is(false));
+		task.terminate();
+		closingFuture.get();
+	}
+
+	private static TaskSlotTable<TaskSlotPayload> createTaskSlotTableWithStartedTask(
+			final TaskSlotPayload task) throws SlotNotFoundException, SlotNotActiveException {
+		return createTaskSlotTableWithStartedTask(task, new TestingSlotActionsBuilder().build());
+	}
+
+	private static TaskSlotTable<TaskSlotPayload> createTaskSlotTableWithStartedTask(
+			final TaskSlotPayload task,
+			final SlotActions slotActions) throws SlotNotFoundException, SlotNotActiveException {
+		final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableWithAllocatedSlot(
+			task.getJobID(),
+			task.getAllocationId(),
+			slotActions);
+		taskSlotTable.markSlotActive(task.getAllocationId());
+		taskSlotTable.addTask(task);
+		return taskSlotTable;
+	}
+
+	private static TaskSlotTable<TaskSlotPayload> createTaskSlotTableWithAllocatedSlot(
+			final JobID jobId,
+			final AllocationID allocationId,
+			final SlotActions slotActions) {
+		final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(1, slotActions);
+		assertThat(taskSlotTable.allocateSlot(0, jobId, allocationId, SLOT_TIMEOUT), is(true));
+		return taskSlotTable;
+	}
+
+	private static TaskSlotTableImpl<TaskSlotPayload> createTaskSlotTableAndStart(final int numberOfSlots) {
+		return createTaskSlotTableAndStart(numberOfSlots, new TestingSlotActionsBuilder().build());
+	}
+
+	private static TaskSlotTableImpl<TaskSlotPayload> createTaskSlotTableAndStart(
+			final int numberOfSlots,
+			final SlotActions slotActions) {
+		final TaskSlotTableImpl<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(numberOfSlots);
+		taskSlotTable.start(slotActions, ComponentMainThreadExecutorServiceAdapter.forMainThread());
+		return taskSlotTable;
+	}
+
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTest.java
new file mode 100644
index 0000000..e922c9e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class TaskSlotTest extends TestLogger {
+	private static final JobID JOB_ID = new JobID();
+	private static final AllocationID ALLOCATION_ID = new AllocationID();
+
+	@Test
+	public void testTaskSlotClosedOnlyWhenAddedTasksTerminated() throws Exception {
+		try (TaskSlot<TaskSlotPayload> taskSlot = createTaskSlot()) {
+			taskSlot.markActive();
+			TestingTaskSlotPayload task = new TestingTaskSlotPayload(JOB_ID, new ExecutionAttemptID(), ALLOCATION_ID);
+			taskSlot.add(task);
+
+			CompletableFuture<Void> closingFuture = taskSlot.closeAsync();
+			task.waitForFailure();
+			MemoryManager memoryManager = taskSlot.getMemoryManager();
+
+			assertThat(closingFuture.isDone(), is(false));
+			assertThat(memoryManager.isShutdown(), is(false));
+			task.terminate();
+			closingFuture.get();
+			assertThat(memoryManager.isShutdown(), is(true));
+		}
+	}
+
+	private static <T extends TaskSlotPayload> TaskSlot<T> createTaskSlot() {
+		return new TaskSlot<>(0, ResourceProfile.ZERO, MemoryManager.MIN_PAGE_SIZE, JOB_ID, ALLOCATION_ID);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
index 27c8d1c..38d0c97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
@@ -39,10 +39,10 @@
 		.setNetworkMemory(new MemorySize(100 * 1024))
 		.build();
 
-	public static <T extends TaskSlotPayload> TaskSlotTable<T> createTaskSlotTable(int numberOfSlots) {
+	public static <T extends TaskSlotPayload> TaskSlotTableImpl<T> createTaskSlotTable(int numberOfSlots) {
 		return createTaskSlotTable(
 			numberOfSlots,
-			createDefaultTimerService(DEFAULT_SLOT_TIMEOUT));
+			createDefaultTimerService());
 	}
 
 	public static <T extends TaskSlotPayload> TaskSlotTable<T> createTaskSlotTable(int numberOfSlots, Time timeout) {
@@ -51,10 +51,10 @@
 			createDefaultTimerService(timeout.toMilliseconds()));
 	}
 
-	private static <T extends TaskSlotPayload> TaskSlotTable<T> createTaskSlotTable(
+	private static <T extends TaskSlotPayload> TaskSlotTableImpl<T> createTaskSlotTable(
 			int numberOfSlots,
 			TimerService<AllocationID> timerService) {
-		return new TaskSlotTable<>(
+		return new TaskSlotTableImpl<>(
 			numberOfSlots,
 			createTotalResourceProfile(numberOfSlots),
 			DEFAULT_RESOURCE_PROFILE,
@@ -70,6 +70,10 @@
 		return result;
 	}
 
+	public static TimerService<AllocationID> createDefaultTimerService() {
+		return createDefaultTimerService(DEFAULT_SLOT_TIMEOUT);
+	}
+
 	public static TimerService<AllocationID> createDefaultTimerService(long shutdownTimeout) {
 		return new TimerService<>(TestingUtils.defaultExecutor(), shutdownTimeout);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingTaskSlotPayload.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingTaskSlotPayload.java
new file mode 100644
index 0000000..2b362ed
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingTaskSlotPayload.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.util.concurrent.CompletableFuture;
+
+class TestingTaskSlotPayload implements TaskSlotPayload {
+	private final JobID jobId;
+	private final ExecutionAttemptID executionAttemptID;
+	private final AllocationID allocationID;
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+	private final OneShotLatch failLatch = new OneShotLatch();
+
+	TestingTaskSlotPayload() {
+		this(new JobID(), new ExecutionAttemptID(), new AllocationID());
+	}
+
+	TestingTaskSlotPayload(JobID jobId, ExecutionAttemptID executionAttemptID, AllocationID allocationID) {
+		this.jobId = jobId;
+		this.executionAttemptID = executionAttemptID;
+		this.allocationID = allocationID;
+	}
+
+	@Override
+	public JobID getJobID() {
+		return jobId;
+	}
+
+	@Override
+	public ExecutionAttemptID getExecutionId() {
+		return executionAttemptID;
+	}
+
+	@Override
+	public AllocationID getAllocationId() {
+		return allocationID;
+	}
+
+	@Override
+	public CompletableFuture<Void> getTerminationFuture() {
+		return terminationFuture;
+	}
+
+	@Override
+	public void failExternally(Throwable cause) {
+		failLatch.trigger();
+	}
+
+	void waitForFailure() throws InterruptedException {
+		failLatch.await();
+	}
+
+	TestingTaskSlotPayload terminate() {
+		terminationFuture.complete(null);
+		return this;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingTaskSlotTable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingTaskSlotTable.java
new file mode 100644
index 0000000..3618df9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingTaskSlotTable.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Testing implementation of {@link TaskSlotTable} for tests.
+ */
+public class TestingTaskSlotTable<T extends TaskSlotPayload> implements TaskSlotTable<T> {
+	private final Supplier<SlotReport> createSlotReportSupplier;
+	private final Supplier<Boolean> allocateSlotSupplier;
+	private final BiFunction<JobID, AllocationID, Boolean> tryMarkSlotActiveBiFunction;
+	private final Function<T, Boolean> addTaskFunction;
+	private final Function<AllocationID, MemoryManager> memoryManagerGetter;
+	private final Supplier<CompletableFuture<Void>> closeAsyncSupplier;
+
+	private TestingTaskSlotTable(
+			Supplier<SlotReport> createSlotReportSupplier,
+			Supplier<Boolean> allocateSlotSupplier,
+			BiFunction<JobID, AllocationID, Boolean> tryMarkSlotActiveBiFunction,
+			Function<T, Boolean> addTaskFunction,
+			Function<AllocationID, MemoryManager> memoryManagerGetter,
+			Supplier<CompletableFuture<Void>> closeAsyncSupplier) {
+		this.createSlotReportSupplier = createSlotReportSupplier;
+		this.allocateSlotSupplier = allocateSlotSupplier;
+		this.tryMarkSlotActiveBiFunction = tryMarkSlotActiveBiFunction;
+		this.addTaskFunction = addTaskFunction;
+		this.memoryManagerGetter = memoryManagerGetter;
+		this.closeAsyncSupplier = closeAsyncSupplier;
+	}
+
+	@Override
+	public void start(SlotActions initialSlotActions, ComponentMainThreadExecutor mainThreadExecutor) {
+
+	}
+
+	@Override
+	public Set<AllocationID> getAllocationIdsPerJob(JobID jobId) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public SlotReport createSlotReport(ResourceID resourceId) {
+		return createSlotReportSupplier.get();
+	}
+
+	@Override
+	public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
+		return allocateSlotSupplier.get();
+	}
+
+	@Override
+	public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile, Time slotTimeout) {
+		return allocateSlotSupplier.get();
+	}
+
+	@Override
+	public boolean markSlotActive(AllocationID allocationId) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int freeSlot(AllocationID allocationId, Throwable cause) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean isValidTimeout(AllocationID allocationId, UUID ticket) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) {
+		return tryMarkSlotActiveBiFunction.apply(jobId, allocationId);
+	}
+
+	@Override
+	public boolean isSlotFree(int index) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean hasAllocatedSlots(JobID jobId) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Iterator<AllocationID> getActiveSlots(JobID jobId) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Nullable
+	@Override
+	public JobID getOwningJob(AllocationID allocationId) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean addTask(T task) {
+		return addTaskFunction.apply(task);
+	}
+
+	@Override
+	public T removeTask(ExecutionAttemptID executionAttemptID) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public T getTask(ExecutionAttemptID executionAttemptID) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Iterator<T> getTasks(JobID jobId) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public AllocationID getCurrentAllocation(int index) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public MemoryManager getTaskMemoryManager(AllocationID allocationID) {
+		return memoryManagerGetter.apply(allocationID);
+	}
+
+	@Override
+	public void notifyTimeout(AllocationID key, UUID ticket) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		return closeAsyncSupplier.get();
+	}
+
+	public static <T extends TaskSlotPayload> TestingTaskSlotTableBuilder<T> newBuilder() {
+		return new TestingTaskSlotTableBuilder<>();
+	}
+
+	/**
+	 * Builder for {@link TestingTaskSlotTable}.
+	 */
+	public static class TestingTaskSlotTableBuilder<T extends TaskSlotPayload> {
+		private Supplier<SlotReport> createSlotReportSupplier = SlotReport::new;
+		private Supplier<Boolean> allocateSlotSupplier = () -> false;
+		private BiFunction<JobID, AllocationID, Boolean> tryMarkSlotActiveBiFunction = (ignoredA, ignoredB) -> false;
+		private Function<T, Boolean> addTaskFunction = (ignored) -> false;
+		private Function<AllocationID, MemoryManager> memoryManagerGetter = ignored -> {
+			throw new UnsupportedOperationException("No memory manager getter has been set.");
+		};
+		private Supplier<CompletableFuture<Void>> closeAsyncSupplier = FutureUtils::completedVoidFuture;
+
+		public TestingTaskSlotTableBuilder<T> createSlotReportSupplier(Supplier<SlotReport> createSlotReportSupplier) {
+			this.createSlotReportSupplier = createSlotReportSupplier;
+			return this;
+		}
+
+		public TestingTaskSlotTableBuilder<T> allocateSlotReturns(boolean toReturn) {
+			this.allocateSlotSupplier = () -> toReturn;
+			return this;
+		}
+
+		public TestingTaskSlotTableBuilder<T> tryMarkSlotActiveReturns(boolean toReturn) {
+			this.tryMarkSlotActiveBiFunction = (stub1, stub2) -> toReturn;
+			return this;
+		}
+
+		public TestingTaskSlotTableBuilder<T> addTaskReturns(boolean toReturn) {
+			this.addTaskFunction = stub -> toReturn;
+			return this;
+		}
+
+		public TestingTaskSlotTableBuilder<T> memoryManagerGetterReturns(MemoryManager toReturn) {
+			this.memoryManagerGetter = stub -> toReturn;
+			return this;
+		}
+
+		public TestingTaskSlotTableBuilder<T> closeAsyncReturns(CompletableFuture<Void> toReturn) {
+			this.closeAsyncSupplier = () -> toReturn;
+			return this;
+		}
+
+		public TaskSlotTable<T> build() {
+			return new TestingTaskSlotTable<>(
+				createSlotReportSupplier,
+				allocateSlotSupplier,
+				tryMarkSlotActiveBiFunction,
+				addTaskFunction,
+				memoryManagerGetter,
+				closeAsyncSupplier);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
index f907bc0..925b388 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
@@ -42,7 +42,6 @@
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.memory.MemoryManagerBuilder;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
@@ -84,6 +83,9 @@
 	private Collection<PermanentBlobKey> requiredJarFileBlobKeys = Collections.emptyList();
 	private Collection<ResultPartitionDeploymentDescriptor> resultPartitions = Collections.emptyList();
 	private Collection<InputGateDeploymentDescriptor> inputGates = Collections.emptyList();
+	private JobID jobId = new JobID();
+	private AllocationID allocationID = new AllocationID();
+	private ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
 
 	public TestTaskBuilder(ShuffleEnvironment<?, ?> shuffleEnvironment) {
 		this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
@@ -149,10 +151,23 @@
 		return this;
 	}
 
+	public TestTaskBuilder setJobId(JobID jobId) {
+		this.jobId = jobId;
+		return this;
+	}
+
+	public TestTaskBuilder setAllocationID(AllocationID allocationID) {
+		this.allocationID = allocationID;
+		return this;
+	}
+
+	public TestTaskBuilder setExecutionAttemptId(ExecutionAttemptID executionAttemptId) {
+		this.executionAttemptId = executionAttemptId;
+		return this;
+	}
+
 	public Task build() throws Exception {
-		final JobID jobId = new JobID();
 		final JobVertexID jobVertexId = new JobVertexID();
-		final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
 
 		final SerializedValue<ExecutionConfig> serializedExecutionConfig = new SerializedValue<>(executionConfig);
 
@@ -182,7 +197,7 @@
 			jobInformation,
 			taskInformation,
 			executionAttemptId,
-			new AllocationID(),
+			allocationID,
 			0,
 			0,
 			resultPartitions,