[hotfix] Segregate TaskSlotPayload interface from Task for TaskSlot and TaskSlotTable
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 778669d..fe272ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -196,7 +196,7 @@
 
 	// --------- task slot allocation table -----------
 
-	private final TaskSlotTable taskSlotTable;
+	private final TaskSlotTable<Task> taskSlotTable;
 
 	private final JobManagerTable jobManagerTable;
 
@@ -1143,7 +1143,7 @@
 
 				final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
 
-				final Iterator<TaskSlot> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
+				final Iterator<TaskSlot<Task>> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
 				final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();
 
 				final Collection<SlotOffer> reservedSlots = new HashSet<>(2);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 91d23b5..1d140b2 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -34,6 +34,7 @@
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -65,7 +66,7 @@
 	private final ShuffleEnvironment<?, ?> shuffleEnvironment;
 	private final KvStateService kvStateService;
 	private final BroadcastVariableManager broadcastVariableManager;
-	private final TaskSlotTable taskSlotTable;
+	private final TaskSlotTable<Task> taskSlotTable;
 	private final JobManagerTable jobManagerTable;
 	private final JobLeaderService jobLeaderService;
 	private final TaskExecutorLocalStateStoresManager taskManagerStateStore;
@@ -78,7 +79,7 @@
 		ShuffleEnvironment<?, ?> shuffleEnvironment,
 		KvStateService kvStateService,
 		BroadcastVariableManager broadcastVariableManager,
-		TaskSlotTable taskSlotTable,
+		TaskSlotTable<Task> taskSlotTable,
 		JobManagerTable jobManagerTable,
 		JobLeaderService jobLeaderService,
 		TaskExecutorLocalStateStoresManager taskManagerStateStore,
@@ -125,7 +126,7 @@
 		return broadcastVariableManager;
 	}
 
-	public TaskSlotTable getTaskSlotTable() {
+	public TaskSlotTable<Task> getTaskSlotTable() {
 		return taskSlotTable;
 	}
 
@@ -241,7 +242,7 @@
 
 		final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
 
-		final TaskSlotTable taskSlotTable = createTaskSlotTable(
+		final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable(
 			taskManagerServicesConfiguration.getNumberOfSlots(),
 			taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
 			taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
@@ -278,7 +279,7 @@
 			taskEventDispatcher);
 	}
 
-	private static TaskSlotTable createTaskSlotTable(
+	private static TaskSlotTable<Task> createTaskSlotTable(
 			final int numberOfSlots,
 			final TaskExecutorResourceSpec taskExecutorResourceSpec,
 			final long timerServiceShutdownTimeout,
@@ -286,7 +287,7 @@
 		final TimerService<AllocationID> timerService = new TimerService<>(
 			new ScheduledThreadPoolExecutor(1),
 			timerServiceShutdownTimeout);
-		return new TaskSlotTable(
+		return new TaskSlotTable<>(
 			numberOfSlots,
 			TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(taskExecutorResourceSpec),
 			TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(taskExecutorResourceSpec, numberOfSlots),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
index edfe899..90e640d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -24,7 +24,6 @@
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -36,7 +35,7 @@
 import java.util.Map;
 
 /**
- * Container for multiple {@link Task} belonging to the same slot. A {@link TaskSlot} can be in one
+ * Container for multiple {@link TaskSlotPayload tasks} belonging to the same slot. A {@link TaskSlot} can be in one
  * of the following states:
  * <ul>
  *     <li>Free - The slot is empty and not allocated to a job</li>
@@ -53,8 +52,10 @@
  *
  * <p>An allocated or active slot can only be freed if it is empty. If it is not empty, then it's state
  * can be set to releasing indicating that it can be freed once it becomes empty.
+ *
+ * @param <T> type of the {@link TaskSlotPayload} stored in this slot
  */
-public class TaskSlot implements AutoCloseable {
+public class TaskSlot<T extends TaskSlotPayload> implements AutoCloseable {
 	private static final Logger LOG = LoggerFactory.getLogger(TaskSlot.class);
 
 	/** Index of the task slot. */
@@ -64,7 +65,7 @@
 	private final ResourceProfile resourceProfile;
 
 	/** Tasks running in this slot. */
-	private final Map<ExecutionAttemptID, Task> tasks;
+	private final Map<ExecutionAttemptID, T> tasks;
 
 	private final MemoryManager memoryManager;
 
@@ -150,7 +151,7 @@
 	 *
 	 * @return Iterator to all currently contained tasks in this task slot.
 	 */
-	public Iterator<Task> getTasks() {
+	public Iterator<T> getTasks() {
 		return tasks.values().iterator();
 	}
 
@@ -175,7 +176,7 @@
 	 * @throws IllegalStateException if the task slot is not in state active
 	 * @return true if the task was added to the task slot; otherwise false
 	 */
-	public boolean add(Task task) {
+	public boolean add(T task) {
 		// Check that this slot has been assigned to the job sending this task
 		Preconditions.checkArgument(task.getJobID().equals(jobId), "The task's job id does not match the " +
 			"job id for which the slot has been allocated.");
@@ -183,7 +184,7 @@
 			"id does not match the allocation id for which the slot has been allocated.");
 		Preconditions.checkState(TaskSlotState.ACTIVE == state, "The task slot is not in state active.");
 
-		Task oldTask = tasks.put(task.getExecutionId(), task);
+		T oldTask = tasks.put(task.getExecutionId(), task);
 
 		if (oldTask != null) {
 			tasks.put(task.getExecutionId(), oldTask);
@@ -199,7 +200,7 @@
 	 * @param executionAttemptId identifying the task to be removed
 	 * @return The removed task if there was any; otherwise null.
 	 */
-	public Task remove(ExecutionAttemptID executionAttemptId) {
+	public T remove(ExecutionAttemptID executionAttemptId) {
 		return tasks.remove(executionAttemptId);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotPayload.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotPayload.java
new file mode 100644
index 0000000..9af2881
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotPayload.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Payload interface for {@link org.apache.flink.runtime.taskexecutor.slot.TaskSlot}.
+ */
+public interface TaskSlotPayload {
+	JobID getJobID();
+
+	ExecutionAttemptID getExecutionId();
+
+	AllocationID getAllocationId();
+
+	CompletableFuture<?> getTerminationFuture();
+
+	/**
+	 * Fail the payload with the given throwable. This operation should
+	 * eventually complete the termination future.
+	 *
+	 * @param cause of the failure
+	 */
+	void failExternally(Throwable cause);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 5053040..00b9691 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -30,7 +30,6 @@
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -58,7 +57,7 @@
  *
  * <p>Before the task slot table can be used, it must be started via the {@link #start} method.
  */
-public class TaskSlotTable implements TimeoutListener<AllocationID> {
+public class TaskSlotTable<T extends TaskSlotPayload> implements TimeoutListener<AllocationID> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TaskSlotTable.class);
 
@@ -79,13 +78,13 @@
 	private final TimerService<AllocationID> timerService;
 
 	/** The list of all task slots. */
-	private final Map<Integer, TaskSlot> taskSlots;
+	private final Map<Integer, TaskSlot<T>> taskSlots;
 
 	/** Mapping from allocation id to task slot. */
-	private final Map<AllocationID, TaskSlot> allocatedSlots;
+	private final Map<AllocationID, TaskSlot<T>> allocatedSlots;
 
 	/** Mapping from execution attempt id to task and task slot. */
-	private final Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;
+	private final Map<ExecutionAttemptID, TaskSlotMapping<T>> taskSlotMappings;
 
 	/** Mapping from job id to allocated slots for a job. */
 	private final Map<JobID, Set<AllocationID>> slotsPerJob;
@@ -191,7 +190,7 @@
 			SlotID slotId = new SlotID(resourceId, i);
 			SlotStatus slotStatus;
 			if (taskSlots.containsKey(i)) {
-				TaskSlot taskSlot = taskSlots.get(i);
+				TaskSlot<T> taskSlot = taskSlots.get(i);
 
 				slotStatus = new SlotStatus(
 					slotId,
@@ -209,7 +208,7 @@
 			slotStatuses.add(slotStatus);
 		}
 
-		for (TaskSlot taskSlot : allocatedSlots.values()) {
+		for (TaskSlot<T> taskSlot : allocatedSlots.values()) {
 			if (taskSlot.getIndex() < 0) {
 				SlotID slotID = SlotID.generateDynamicSlotID(resourceId);
 				SlotStatus slotStatus = new SlotStatus(
@@ -263,14 +262,14 @@
 
 		Preconditions.checkArgument(index < numberSlots);
 
-		TaskSlot taskSlot = allocatedSlots.get(allocationId);
+		TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
 		if (taskSlot != null) {
 			LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot);
 			return false;
 		}
 
 		if (taskSlots.containsKey(index)) {
-			TaskSlot duplicatedTaskSlot = taskSlots.get(index);
+			TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index);
 			LOG.info("Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.",
 				index,
 				duplicatedTaskSlot.getResourceProfile(),
@@ -293,7 +292,7 @@
 			return false;
 		}
 
-		taskSlot = new TaskSlot(index, resourceProfile, memoryPageSize, jobId, allocationId);
+		taskSlot = new TaskSlot<>(index, resourceProfile, memoryPageSize, jobId, allocationId);
 		if (index >= 0) {
 			taskSlots.put(index, taskSlot);
 		}
@@ -328,7 +327,7 @@
 	public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException {
 		checkInit();
 
-		TaskSlot taskSlot = getTaskSlot(allocationId);
+		TaskSlot<T> taskSlot = getTaskSlot(allocationId);
 
 		if (taskSlot != null) {
 			if (taskSlot.markActive()) {
@@ -358,7 +357,7 @@
 	public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) throws SlotNotFoundException {
 		checkInit();
 
-		TaskSlot taskSlot = getTaskSlot(allocationId);
+		TaskSlot<T> taskSlot = getTaskSlot(allocationId);
 
 		if (taskSlot != null) {
 			if (taskSlot.markInactive()) {
@@ -400,7 +399,7 @@
 	public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
 		checkInit();
 
-		TaskSlot taskSlot = getTaskSlot(allocationId);
+		TaskSlot<T> taskSlot = getTaskSlot(allocationId);
 
 		if (taskSlot != null) {
 			if (LOG.isDebugEnabled()) {
@@ -441,7 +440,7 @@
 				// and set the slot state to releasing so that it gets eventually freed
 				taskSlot.markReleasing();
 
-				Iterator<Task> taskIterator = taskSlot.getTasks();
+				Iterator<T> taskIterator = taskSlot.getTasks();
 
 				while (taskIterator.hasNext()) {
 					taskIterator.next().failExternally(cause);
@@ -476,7 +475,7 @@
 	 * @return True if the given task slot is allocated for the given job and allocation id
 	 */
 	public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
-		TaskSlot taskSlot = taskSlots.get(index);
+		TaskSlot<T> taskSlot = taskSlots.get(index);
 		if (taskSlot != null) {
 			return taskSlot.isAllocated(jobId, allocationId);
 		} else if (index < 0) {
@@ -494,7 +493,7 @@
 	 * @return True if the task slot could be marked active.
 	 */
 	public boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) {
-		TaskSlot taskSlot = getTaskSlot(allocationId);
+		TaskSlot<T> taskSlot = getTaskSlot(allocationId);
 
 		if (taskSlot != null && taskSlot.isAllocated(jobId, allocationId)) {
 			return taskSlot.markActive();
@@ -529,7 +528,7 @@
 	 * @param jobId for which to return the allocated slots
 	 * @return Iterator of allocated slots.
 	 */
-	public Iterator<TaskSlot> getAllocatedSlots(JobID jobId) {
+	public Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId) {
 		return new TaskSlotIterator(jobId, TaskSlotState.ALLOCATED);
 	}
 
@@ -553,7 +552,7 @@
 	 */
 	@Nullable
 	public JobID getOwningJob(AllocationID allocationId) {
-		final TaskSlot taskSlot = getTaskSlot(allocationId);
+		final TaskSlot<T> taskSlot = getTaskSlot(allocationId);
 
 		if (taskSlot != null) {
 			return taskSlot.getJobId();
@@ -574,15 +573,15 @@
 	 * @throws SlotNotActiveException if there was no slot active for task's job and allocation id
 	 * @return True if the task could be added to the task slot; otherwise false
 	 */
-	public boolean addTask(Task task) throws SlotNotFoundException, SlotNotActiveException {
+	public boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException {
 		Preconditions.checkNotNull(task);
 
-		TaskSlot taskSlot = getTaskSlot(task.getAllocationId());
+		TaskSlot<T> taskSlot = getTaskSlot(task.getAllocationId());
 
 		if (taskSlot != null) {
 			if (taskSlot.isActive(task.getJobID(), task.getAllocationId())) {
 				if (taskSlot.add(task)) {
-					taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping(task, taskSlot));
+					taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping<>(task, taskSlot));
 
 					return true;
 				} else {
@@ -604,14 +603,14 @@
 	 * @param executionAttemptID identifying the task to remove
 	 * @return The removed task if there is any for the given execution attempt id; otherwise null
 	 */
-	public Task removeTask(ExecutionAttemptID executionAttemptID) {
+	public T removeTask(ExecutionAttemptID executionAttemptID) {
 		checkInit();
 
-		TaskSlotMapping taskSlotMapping = taskSlotMappings.remove(executionAttemptID);
+		TaskSlotMapping<T> taskSlotMapping = taskSlotMappings.remove(executionAttemptID);
 
 		if (taskSlotMapping != null) {
-			Task task = taskSlotMapping.getTask();
-			TaskSlot taskSlot = taskSlotMapping.getTaskSlot();
+			T task = taskSlotMapping.getTask();
+			TaskSlot<T> taskSlot = taskSlotMapping.getTaskSlot();
 
 			taskSlot.remove(task.getExecutionId());
 
@@ -631,8 +630,8 @@
 	 * @param executionAttemptID identifying the requested task
 	 * @return The task for the given execution attempt id if it exist; otherwise null
 	 */
-	public Task getTask(ExecutionAttemptID executionAttemptID) {
-		TaskSlotMapping taskSlotMapping = taskSlotMappings.get(executionAttemptID);
+	public T getTask(ExecutionAttemptID executionAttemptID) {
+		TaskSlotMapping<T> taskSlotMapping = taskSlotMappings.get(executionAttemptID);
 
 		if (taskSlotMapping != null) {
 			return taskSlotMapping.getTask();
@@ -647,7 +646,7 @@
 	 * @param jobId identifying the job of the requested tasks
 	 * @return Iterator over all task for a given job
 	 */
-	public Iterator<Task> getTasks(JobID jobId) {
+	public Iterator<T> getTasks(JobID jobId) {
 		return new TaskIterator(jobId);
 	}
 
@@ -658,7 +657,7 @@
 	 * @return Allocation id of the specified slot if allocated; otherwise null
 	 */
 	public AllocationID getCurrentAllocation(int index) {
-		TaskSlot taskSlot = taskSlots.get(index);
+		TaskSlot<T> taskSlot = taskSlots.get(index);
 		if (taskSlot == null) {
 			return null;
 		}
@@ -672,7 +671,7 @@
 	 * @return the memory manager of the slot allocated for the task
 	 */
 	public MemoryManager getTaskMemoryManager(AllocationID allocationID) throws SlotNotFoundException {
-		TaskSlot taskSlot = getTaskSlot(allocationID);
+		TaskSlot<T> taskSlot = getTaskSlot(allocationID);
 		if (taskSlot != null) {
 			return taskSlot.getMemoryManager();
 		} else {
@@ -698,7 +697,7 @@
 	// ---------------------------------------------------------------------
 
 	@Nullable
-	private TaskSlot getTaskSlot(AllocationID allocationId) {
+	private TaskSlot<T> getTaskSlot(AllocationID allocationId) {
 		Preconditions.checkNotNull(allocationId);
 
 		return allocatedSlots.get(allocationId);
@@ -713,22 +712,22 @@
 	// ---------------------------------------------------------------------
 
 	/**
-	 * Mapping class between a {@link Task} and a {@link TaskSlot}.
+	 * Mapping class between a {@link TaskSlotPayload} and a {@link TaskSlot}.
 	 */
-	private static final class TaskSlotMapping {
-		private final Task task;
-		private final TaskSlot taskSlot;
+	private static final class TaskSlotMapping<T extends TaskSlotPayload> {
+		private final T task;
+		private final TaskSlot<T> taskSlot;
 
-		private TaskSlotMapping(Task task, TaskSlot taskSlot) {
+		private TaskSlotMapping(T task, TaskSlot<T> taskSlot) {
 			this.task = Preconditions.checkNotNull(task);
 			this.taskSlot = Preconditions.checkNotNull(taskSlot);
 		}
 
-		public Task getTask() {
+		public T getTask() {
 			return task;
 		}
 
-		public TaskSlot getTaskSlot() {
+		public TaskSlot<T> getTaskSlot() {
 			return taskSlot;
 		}
 	}
@@ -738,7 +737,7 @@
 	 * the task slots identified by the allocation ids are in the given state.
 	 */
 	private final class AllocationIDIterator implements Iterator<AllocationID> {
-		private final Iterator<TaskSlot> iterator;
+		private final Iterator<TaskSlot<T>> iterator;
 
 		private AllocationIDIterator(JobID jobId, TaskSlotState state) {
 			iterator = new TaskSlotIterator(jobId, state);
@@ -768,11 +767,11 @@
 	 * Iterator over {@link TaskSlot} which fulfill a given state condition and belong to the given
 	 * job.
 	 */
-	private final class TaskSlotIterator implements Iterator<TaskSlot> {
+	private final class TaskSlotIterator implements Iterator<TaskSlot<T>> {
 		private final Iterator<AllocationID> allSlots;
 		private final TaskSlotState state;
 
-		private TaskSlot currentSlot;
+		private TaskSlot<T> currentSlot;
 
 		private TaskSlotIterator(JobID jobId, TaskSlotState state) {
 
@@ -794,7 +793,7 @@
 			while (currentSlot == null && allSlots.hasNext()) {
 				AllocationID tempSlot = allSlots.next();
 
-				TaskSlot taskSlot = getTaskSlot(tempSlot);
+				TaskSlot<T> taskSlot = getTaskSlot(tempSlot);
 
 				if (taskSlot != null && taskSlot.getState() == state) {
 					currentSlot = taskSlot;
@@ -805,9 +804,9 @@
 		}
 
 		@Override
-		public TaskSlot next() {
+		public TaskSlot<T> next() {
 			if (currentSlot != null) {
-				TaskSlot result = currentSlot;
+				TaskSlot<T> result = currentSlot;
 
 				currentSlot = null;
 
@@ -822,7 +821,7 @@
 						throw new NoSuchElementException("No more task slots.");
 					}
 
-					TaskSlot taskSlot = getTaskSlot(tempSlot);
+					TaskSlot<T> taskSlot = getTaskSlot(tempSlot);
 
 					if (taskSlot != null && taskSlot.getState() == state) {
 						return taskSlot;
@@ -838,12 +837,12 @@
 	}
 
 	/**
-	 * Iterator over all {@link Task} for a given job.
+	 * Iterator over all {@link TaskSlotPayload} for a given job.
 	 */
-	private final class TaskIterator implements Iterator<Task> {
-		private final Iterator<TaskSlot> taskSlotIterator;
+	private final class TaskIterator implements Iterator<T> {
+		private final Iterator<TaskSlot<T>> taskSlotIterator;
 
-		private Iterator<Task> currentTasks;
+		private Iterator<T> currentTasks;
 
 		private TaskIterator(JobID jobId) {
 			this.taskSlotIterator = new TaskSlotIterator(jobId, TaskSlotState.ACTIVE);
@@ -854,7 +853,7 @@
 		@Override
 		public boolean hasNext() {
 			while ((currentTasks == null || !currentTasks.hasNext()) && taskSlotIterator.hasNext()) {
-				TaskSlot taskSlot = taskSlotIterator.next();
+				TaskSlot<T> taskSlot = taskSlotIterator.next();
 
 				currentTasks = taskSlot.getTasks();
 			}
@@ -863,9 +862,9 @@
 		}
 
 		@Override
-		public Task next() {
+		public T next() {
 			while ((currentTasks == null || !currentTasks.hasNext())) {
-				TaskSlot taskSlot;
+				TaskSlot<T> taskSlot;
 
 				try {
 					taskSlot = taskSlotIterator.next();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 07577ca..3482e6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -70,6 +70,7 @@
 import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
 import org.apache.flink.runtime.taskexecutor.KvStateService;
 import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotPayload;
 import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.ExceptionUtils;
@@ -123,7 +124,7 @@
  *
  * <p>Each Task is run by one dedicated thread.
  */
-public class Task implements Runnable, TaskActions, PartitionProducerStateProvider, CheckpointListener, BackPressureSampleableTask {
+public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionProducerStateProvider, CheckpointListener, BackPressureSampleableTask {
 
 	/** The class logger. */
 	private static final Logger LOG = LoggerFactory.getLogger(Task.class);
@@ -402,6 +403,7 @@
 	//  Accessors
 	// ------------------------------------------------------------------------
 
+	@Override
 	public JobID getJobID() {
 		return jobId;
 	}
@@ -410,10 +412,12 @@
 		return vertexId;
 	}
 
+	@Override
 	public ExecutionAttemptID getExecutionId() {
 		return executionId;
 	}
 
+	@Override
 	public AllocationID getAllocationId() {
 		return allocationId;
 	}
@@ -442,6 +446,7 @@
 		return executingThread;
 	}
 
+	@Override
 	public CompletableFuture<ExecutionState> getTerminationFuture() {
 		return terminationFuture;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index fe0e1c2..b24602f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -64,6 +64,7 @@
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
+import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -291,7 +292,7 @@
 				Collections.emptyList(),
 				0);
 
-		final TaskSlotTable taskSlotTable = createTaskSlotTable();
+		final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable();
 
 		final TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
 			false,
@@ -455,7 +456,7 @@
 			TaskManagerRunner.createBackPressureSampleService(configuration, RPC.getScheduledExecutor()));
 	}
 
-	private static TaskSlotTable createTaskSlotTable() {
+	private static TaskSlotTable<Task> createTaskSlotTable() {
 		return TaskSlotUtils.createTaskSlotTable(1, timeout);
 
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
index 2c4c2b0..cdde9be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
@@ -213,7 +213,7 @@
 				.setMetricQueryServiceAddress(metricQueryServiceAddress)
 				.build()) {
 			TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
-			TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+			TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
 
 			taskSlotTable.allocateSlot(0, jobId, tdd1.getAllocationId(), Time.seconds(60));
 			tmGateway.submitTask(tdd1, env.getJobMasterId(), timeout).get();
@@ -263,7 +263,7 @@
 				.setSlotSize(2)
 				.build()) {
 			TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
-			TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+			TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
 
 			taskSlotTable.allocateSlot(0, jobId, tdd1.getAllocationId(), Time.seconds(60));
 			tmGateway.submitTask(tdd1, env.getJobMasterId(), timeout).get();
@@ -319,7 +319,7 @@
 				.useRealNonMockShuffleEnvironment()
 				.build()) {
 			TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
-			TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+			TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
 
 			taskSlotTable.allocateSlot(0, jobId, tdd1.getAllocationId(), Time.seconds(60));
 			tmGateway.submitTask(tdd1, jobMasterId, timeout).get();
@@ -386,7 +386,7 @@
 				.useRealNonMockShuffleEnvironment()
 				.build()) {
 			TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
-			TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+			TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
 
 			taskSlotTable.allocateSlot(0, jobId, tdd1.getAllocationId(), Time.seconds(60));
 			tmGateway.submitTask(tdd1, jobMasterId, timeout).get();
@@ -437,7 +437,7 @@
 				.useRealNonMockShuffleEnvironment()
 				.build()) {
 			TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
-			TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+			TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
 
 			taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
 			tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
@@ -470,7 +470,7 @@
 				.addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture)
 				.build()) {
 			TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
-			TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+			TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
 
 			taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
 			tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
@@ -524,7 +524,7 @@
 				.useRealNonMockShuffleEnvironment()
 				.build()) {
 			TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
-			TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+			TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
 
 			taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
 			tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
@@ -584,7 +584,7 @@
 				.useRealNonMockShuffleEnvironment()
 				.build()) {
 			TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
-			TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+			TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
 
 			TestingAbstractInvokables.TestInvokableRecordCancel.resetGotCanceledFuture();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index c055489..423e401 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -101,6 +101,7 @@
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
+import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -865,7 +866,7 @@
 	 */
 	@Test
 	public void testJobLeaderDetection() throws Exception {
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
+		final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
 		final JobManagerTable jobManagerTable = new JobManagerTable();
 		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
 
@@ -943,7 +944,7 @@
 	 */
 	@Test
 	public void testSlotAcceptance() throws Exception {
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+		final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
 		final JobManagerTable jobManagerTable = new JobManagerTable();
 		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
 
@@ -1036,7 +1037,7 @@
 	 */
 	@Test
 	public void testSubmitTaskBeforeAcceptSlot() throws Exception {
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+		final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
 		final JobManagerTable jobManagerTable = new JobManagerTable();
 		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
 
@@ -1215,7 +1216,7 @@
 		final RecordingHeartbeatServices heartbeatServices = new RecordingHeartbeatServices(heartbeatInterval, heartbeatTimeout);
 		final ResourceID rmResourceID = ResourceID.generate();
 
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
+		final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
 
 		final String rmAddress = "rm";
 		final TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway(
@@ -1267,7 +1268,7 @@
 	 */
 	@Test
 	public void testRemoveJobFromJobLeaderService() throws Exception {
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
+		final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
 
 		final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
 
@@ -1359,7 +1360,7 @@
 	@Test
 	public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exception {
 		configuration.set(TaskManagerOptions.REGISTRATION_TIMEOUT, TimeUtils.parseDuration("100 ms"));
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
+		final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
 
 		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
 
@@ -1452,7 +1453,7 @@
 	 */
 	@Test
 	public void testReconnectionAttemptIfExplicitlyDisconnected() throws Exception {
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
+		final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
 		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
 		final TaskExecutor taskExecutor = createTaskExecutor(new TaskManagerServicesBuilder()
 			.setTaskSlotTable(taskSlotTable)
@@ -1560,7 +1561,7 @@
 	 */
 	@Test
 	public void testInitialSlotReportFailure() throws Exception {
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
+		final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
 		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
 		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
 			.setTaskSlotTable(taskSlotTable)
@@ -1614,7 +1615,7 @@
 	 */
 	@Test
 	public void testOfferSlotToJobMasterAfterTimeout() throws Exception {
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+		final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
 		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
 			.setTaskSlotTable(taskSlotTable)
 			.build();
@@ -1794,7 +1795,7 @@
 	@Test
 	public void testSyncSlotsWithJobMasterByHeartbeat() throws Exception {
 		final CountDownLatch activeSlots = new CountDownLatch(2);
-		final TaskSlotTable taskSlotTable = new ActivateSlotNotifyingTaskSlotTable(
+		final TaskSlotTable<Task> taskSlotTable = new ActivateSlotNotifyingTaskSlotTable(
 				2,
 				activeSlots);
 		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
@@ -1968,7 +1969,7 @@
 		final OneShotLatch taskInTerminalState = new OneShotLatch();
 		final TaskManagerActions taskManagerActions = createTaskManagerActionsWithTerminalStateTrigger(taskInTerminalState);
 		final JobManagerTable jobManagerTable = createJobManagerTableWithOneJob(jobMasterId, taskManagerActions);
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+		final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
 		final TaskExecutor taskExecutor = createTaskExecutor(new TaskManagerServicesBuilder()
 			.setTaskSlotTable(taskSlotTable)
 			.setJobManagerTable(jobManagerTable)
@@ -2016,7 +2017,7 @@
 	}
 
 	private TaskExecutor createTaskExecutor(int numberOFSlots) {
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(numberOFSlots);
+		final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(numberOFSlots);
 		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
 		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
 			.setTaskSlotTable(taskSlotTable)
@@ -2164,7 +2165,7 @@
 		}
 	}
 
-	private static final class TestingTaskSlotTable extends TaskSlotTable {
+	private static final class TestingTaskSlotTable extends TaskSlotTable<Task> {
 		private final Queue<SlotReport> slotReports;
 
 		private TestingTaskSlotTable(Queue<SlotReport> slotReports) {
@@ -2178,7 +2179,7 @@
 		}
 	}
 
-	private static final class AllocateSlotNotifyingTaskSlotTable extends TaskSlotTable {
+	private static final class AllocateSlotNotifyingTaskSlotTable extends TaskSlotTable<Task> {
 
 		private final OneShotLatch allocateSlotLatch;
 
@@ -2204,7 +2205,7 @@
 		}
 	}
 
-	private static final class ActivateSlotNotifyingTaskSlotTable extends TaskSlotTable {
+	private static final class ActivateSlotNotifyingTaskSlotTable extends TaskSlotTable<Task> {
 
 		private final CountDownLatch slotsToActivate;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
index 1f247046..8b348e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
@@ -28,6 +28,7 @@
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import static org.mockito.Mockito.mock;
@@ -43,7 +44,7 @@
 	private ShuffleEnvironment<?, ?> shuffleEnvironment;
 	private KvStateService kvStateService;
 	private BroadcastVariableManager broadcastVariableManager;
-	private TaskSlotTable taskSlotTable;
+	private TaskSlotTable<Task> taskSlotTable;
 	private JobManagerTable jobManagerTable;
 	private JobLeaderService jobLeaderService;
 	private TaskExecutorLocalStateStoresManager taskStateManager;
@@ -56,7 +57,7 @@
 		kvStateService = new KvStateService(new KvStateRegistry(), null, null);
 		broadcastVariableManager = new BroadcastVariableManager();
 		taskEventDispatcher = new TaskEventDispatcher();
-		taskSlotTable = mock(TaskSlotTable.class);
+		taskSlotTable = (TaskSlotTable<Task>) mock(TaskSlotTable.class);
 		jobManagerTable = new JobManagerTable();
 		jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
 		taskStateManager = mock(TaskExecutorLocalStateStoresManager.class);
@@ -87,7 +88,7 @@
 		return this;
 	}
 
-	public TaskManagerServicesBuilder setTaskSlotTable(TaskSlotTable taskSlotTable) {
+	public TaskManagerServicesBuilder setTaskSlotTable(TaskSlotTable<Task> taskSlotTable) {
 		this.taskSlotTable = taskSlotTable;
 		return this;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 9c66d2a..1661efb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -94,7 +94,7 @@
 
 	private final TestingHighAvailabilityServices haServices;
 	private final TemporaryFolder temporaryFolder;
-	private final TaskSlotTable taskSlotTable;
+	private final TaskSlotTable<Task> taskSlotTable;
 	private final JobMasterId jobMasterId;
 
 	private TestingTaskExecutor taskExecutor;
@@ -122,7 +122,8 @@
 		if (slotSize > 0) {
 			this.taskSlotTable = TaskSlotUtils.createTaskSlotTable(slotSize);
 		} else {
-			this.taskSlotTable = mock(TaskSlotTable.class);
+			//noinspection unchecked
+			this.taskSlotTable = (TaskSlotTable<Task>) mock(TaskSlotTable.class);
 			when(taskSlotTable.tryMarkSlotActive(eq(jobId), any())).thenReturn(true);
 			when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);
 		}
@@ -178,7 +179,7 @@
 		return taskExecutor.getSelfGateway(TaskExecutorGateway.class);
 	}
 
-	public TaskSlotTable getTaskSlotTable() {
+	public TaskSlotTable<Task> getTaskSlotTable() {
 		return taskSlotTable;
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestTaskManagerActions.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestTaskManagerActions.java
index de36e23..2405476 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestTaskManagerActions.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestTaskManagerActions.java
@@ -24,6 +24,7 @@
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 
@@ -40,10 +41,10 @@
 public class TestTaskManagerActions implements TaskManagerActions {
 
 	private final JobMasterGateway jobMasterGateway;
-	private final TaskSlotTable taskSlotTable;
+	private final TaskSlotTable<Task> taskSlotTable;
 	private final TaskManagerActionListeners taskManagerActionListeners = new TaskManagerActionListeners();
 
-	public TestTaskManagerActions(TaskSlotTable taskSlotTable, JobMasterGateway jobMasterGateway) {
+	public TestTaskManagerActions(TaskSlotTable<Task> taskSlotTable, JobMasterGateway jobMasterGateway) {
 		this.taskSlotTable = taskSlotTable;
 		this.jobMasterGateway = jobMasterGateway;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
index 2655b1e..55a71d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
@@ -56,7 +56,7 @@
 	 */
 	@Test
 	public void testTryMarkSlotActive() throws SlotNotFoundException {
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(3);
+		final TaskSlotTable<?> taskSlotTable = TaskSlotUtils.createTaskSlotTable(3);
 
 		try {
 			taskSlotTable.start(new TestingSlotActionsBuilder().build());
@@ -94,7 +94,7 @@
 	 */
 	@Test
 	public void testRedundantSlotAllocation() {
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+		final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
 
 		try {
 			taskSlotTable.start(new TestingSlotActionsBuilder().build());
@@ -108,7 +108,7 @@
 			assertThat(taskSlotTable.isAllocated(0, jobId, allocationId), is(true));
 			assertThat(taskSlotTable.isSlotFree(1), is(true));
 
-			Iterator<TaskSlot> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
+			Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
 			assertThat(allocatedSlots.next().getIndex(), is(0));
 			assertThat(allocatedSlots.hasNext(), is(false));
 		} finally {
@@ -118,7 +118,7 @@
 
 	@Test
 	public void testFreeSlot() throws SlotNotFoundException {
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+		final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
 
 		try {
 			taskSlotTable.start(new TestingSlotActionsBuilder().build());
@@ -132,7 +132,7 @@
 
 			assertThat(taskSlotTable.freeSlot(allocationId2), is(1));
 
-			Iterator<TaskSlot> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
+			Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
 			assertThat(allocatedSlots.next().getIndex(), is(0));
 			assertThat(allocatedSlots.hasNext(), is(false));
 			assertThat(taskSlotTable.isAllocated(1, jobId, allocationId1), is(false));
@@ -145,7 +145,7 @@
 
 	@Test
 	public void testSlotAllocationWithDynamicSlotId() {
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+		final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
 
 		try {
 			taskSlotTable.start(new TestingSlotActionsBuilder().build());
@@ -155,7 +155,7 @@
 
 			assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId, SLOT_TIMEOUT), is(true));
 
-			Iterator<TaskSlot> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
+			Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
 			assertThat(allocatedSlots.next().getIndex(), is(-1));
 			assertThat(allocatedSlots.hasNext(), is(false));
 			assertThat(taskSlotTable.isAllocated(-1, jobId, allocationId), is(true));
@@ -166,7 +166,7 @@
 
 	@Test
 	public void testSlotAllocationWithResourceProfile() {
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+		final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
 
 		try {
 			taskSlotTable.start(new TestingSlotActionsBuilder().build());
@@ -178,8 +178,8 @@
 
 			assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId, resourceProfile, SLOT_TIMEOUT), is(true));
 
-			Iterator<TaskSlot> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
-			TaskSlot allocatedSlot = allocatedSlots.next();
+			Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
+			TaskSlot<TaskSlotPayload> allocatedSlot = allocatedSlots.next();
 			assertThat(allocatedSlot.getIndex(), is(-1));
 			assertThat(allocatedSlot.getResourceProfile(), is(resourceProfile));
 			assertThat(allocatedSlots.hasNext(), is(false));
@@ -190,7 +190,7 @@
 
 	@Test
 	public void testSlotAllocationWithResourceProfileFailure() {
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+		final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
 
 		try {
 			taskSlotTable.start(new TestingSlotActionsBuilder().build());
@@ -202,7 +202,7 @@
 
 			assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId, resourceProfile, SLOT_TIMEOUT), is(false));
 
-			Iterator<TaskSlot> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
+			Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
 			assertThat(allocatedSlots.hasNext(), is(false));
 		} finally {
 			taskSlotTable.stop();
@@ -211,7 +211,7 @@
 
 	@Test
 	public void testGenerateSlotReport() throws SlotNotFoundException {
-		final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(3);
+		final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(3);
 
 		try {
 			taskSlotTable.start(new TestingSlotActionsBuilder().build());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
index 8aeb287..27c8d1c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
@@ -39,22 +39,22 @@
 		.setNetworkMemory(new MemorySize(100 * 1024))
 		.build();
 
-	public static TaskSlotTable createTaskSlotTable(int numberOfSlots) {
+	public static <T extends TaskSlotPayload> TaskSlotTable<T> createTaskSlotTable(int numberOfSlots) {
 		return createTaskSlotTable(
 			numberOfSlots,
 			createDefaultTimerService(DEFAULT_SLOT_TIMEOUT));
 	}
 
-	public static TaskSlotTable createTaskSlotTable(int numberOfSlots, Time timeout) {
+	public static <T extends TaskSlotPayload> TaskSlotTable<T> createTaskSlotTable(int numberOfSlots, Time timeout) {
 		return createTaskSlotTable(
 			numberOfSlots,
 			createDefaultTimerService(timeout.toMilliseconds()));
 	}
 
-	private static TaskSlotTable createTaskSlotTable(
+	private static <T extends TaskSlotPayload> TaskSlotTable<T> createTaskSlotTable(
 			int numberOfSlots,
 			TimerService<AllocationID> timerService) {
-		return new TaskSlotTable(
+		return new TaskSlotTable<>(
 			numberOfSlots,
 			createTotalResourceProfile(numberOfSlots),
 			DEFAULT_RESOURCE_PROFILE,