[hotfix] Segregate TaskSlotPayload interface from Task for TaskSlot and TaskSlotTable
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 778669d..fe272ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -196,7 +196,7 @@
// --------- task slot allocation table -----------
- private final TaskSlotTable taskSlotTable;
+ private final TaskSlotTable<Task> taskSlotTable;
private final JobManagerTable jobManagerTable;
@@ -1143,7 +1143,7 @@
final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
- final Iterator<TaskSlot> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
+ final Iterator<TaskSlot<Task>> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();
final Collection<SlotOffer> reservedSlots = new HashSet<>(2);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 91d23b5..1d140b2 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -34,6 +34,7 @@
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
@@ -65,7 +66,7 @@
private final ShuffleEnvironment<?, ?> shuffleEnvironment;
private final KvStateService kvStateService;
private final BroadcastVariableManager broadcastVariableManager;
- private final TaskSlotTable taskSlotTable;
+ private final TaskSlotTable<Task> taskSlotTable;
private final JobManagerTable jobManagerTable;
private final JobLeaderService jobLeaderService;
private final TaskExecutorLocalStateStoresManager taskManagerStateStore;
@@ -78,7 +79,7 @@
ShuffleEnvironment<?, ?> shuffleEnvironment,
KvStateService kvStateService,
BroadcastVariableManager broadcastVariableManager,
- TaskSlotTable taskSlotTable,
+ TaskSlotTable<Task> taskSlotTable,
JobManagerTable jobManagerTable,
JobLeaderService jobLeaderService,
TaskExecutorLocalStateStoresManager taskManagerStateStore,
@@ -125,7 +126,7 @@
return broadcastVariableManager;
}
- public TaskSlotTable getTaskSlotTable() {
+ public TaskSlotTable<Task> getTaskSlotTable() {
return taskSlotTable;
}
@@ -241,7 +242,7 @@
final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
- final TaskSlotTable taskSlotTable = createTaskSlotTable(
+ final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable(
taskManagerServicesConfiguration.getNumberOfSlots(),
taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
@@ -278,7 +279,7 @@
taskEventDispatcher);
}
- private static TaskSlotTable createTaskSlotTable(
+ private static TaskSlotTable<Task> createTaskSlotTable(
final int numberOfSlots,
final TaskExecutorResourceSpec taskExecutorResourceSpec,
final long timerServiceShutdownTimeout,
@@ -286,7 +287,7 @@
final TimerService<AllocationID> timerService = new TimerService<>(
new ScheduledThreadPoolExecutor(1),
timerServiceShutdownTimeout);
- return new TaskSlotTable(
+ return new TaskSlotTable<>(
numberOfSlots,
TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(taskExecutorResourceSpec),
TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(taskExecutorResourceSpec, numberOfSlots),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
index edfe899..90e640d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -24,7 +24,6 @@
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -36,7 +35,7 @@
import java.util.Map;
/**
- * Container for multiple {@link Task} belonging to the same slot. A {@link TaskSlot} can be in one
+ * Container for multiple {@link TaskSlotPayload tasks} belonging to the same slot. A {@link TaskSlot} can be in one
* of the following states:
* <ul>
* <li>Free - The slot is empty and not allocated to a job</li>
@@ -53,8 +52,10 @@
*
* <p>An allocated or active slot can only be freed if it is empty. If it is not empty, then it's state
* can be set to releasing indicating that it can be freed once it becomes empty.
+ *
+ * @param <T> type of the {@link TaskSlotPayload} stored in this slot
*/
-public class TaskSlot implements AutoCloseable {
+public class TaskSlot<T extends TaskSlotPayload> implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(TaskSlot.class);
/** Index of the task slot. */
@@ -64,7 +65,7 @@
private final ResourceProfile resourceProfile;
/** Tasks running in this slot. */
- private final Map<ExecutionAttemptID, Task> tasks;
+ private final Map<ExecutionAttemptID, T> tasks;
private final MemoryManager memoryManager;
@@ -150,7 +151,7 @@
*
* @return Iterator to all currently contained tasks in this task slot.
*/
- public Iterator<Task> getTasks() {
+ public Iterator<T> getTasks() {
return tasks.values().iterator();
}
@@ -175,7 +176,7 @@
* @throws IllegalStateException if the task slot is not in state active
* @return true if the task was added to the task slot; otherwise false
*/
- public boolean add(Task task) {
+ public boolean add(T task) {
// Check that this slot has been assigned to the job sending this task
Preconditions.checkArgument(task.getJobID().equals(jobId), "The task's job id does not match the " +
"job id for which the slot has been allocated.");
@@ -183,7 +184,7 @@
"id does not match the allocation id for which the slot has been allocated.");
Preconditions.checkState(TaskSlotState.ACTIVE == state, "The task slot is not in state active.");
- Task oldTask = tasks.put(task.getExecutionId(), task);
+ T oldTask = tasks.put(task.getExecutionId(), task);
if (oldTask != null) {
tasks.put(task.getExecutionId(), oldTask);
@@ -199,7 +200,7 @@
* @param executionAttemptId identifying the task to be removed
* @return The removed task if there was any; otherwise null.
*/
- public Task remove(ExecutionAttemptID executionAttemptId) {
+ public T remove(ExecutionAttemptID executionAttemptId) {
return tasks.remove(executionAttemptId);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotPayload.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotPayload.java
new file mode 100644
index 0000000..9af2881
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotPayload.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Payload interface for {@link org.apache.flink.runtime.taskexecutor.slot.TaskSlot}.
+ */
+public interface TaskSlotPayload {
+ JobID getJobID();
+
+ ExecutionAttemptID getExecutionId();
+
+ AllocationID getAllocationId();
+
+ CompletableFuture<?> getTerminationFuture();
+
+ /**
+ * Fail the payload with the given throwable. This operation should
+ * eventually complete the termination future.
+ *
+ * @param cause of the failure
+ */
+ void failExternally(Throwable cause);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 5053040..00b9691 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -30,7 +30,6 @@
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -58,7 +57,7 @@
*
* <p>Before the task slot table can be used, it must be started via the {@link #start} method.
*/
-public class TaskSlotTable implements TimeoutListener<AllocationID> {
+public class TaskSlotTable<T extends TaskSlotPayload> implements TimeoutListener<AllocationID> {
private static final Logger LOG = LoggerFactory.getLogger(TaskSlotTable.class);
@@ -79,13 +78,13 @@
private final TimerService<AllocationID> timerService;
/** The list of all task slots. */
- private final Map<Integer, TaskSlot> taskSlots;
+ private final Map<Integer, TaskSlot<T>> taskSlots;
/** Mapping from allocation id to task slot. */
- private final Map<AllocationID, TaskSlot> allocatedSlots;
+ private final Map<AllocationID, TaskSlot<T>> allocatedSlots;
/** Mapping from execution attempt id to task and task slot. */
- private final Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;
+ private final Map<ExecutionAttemptID, TaskSlotMapping<T>> taskSlotMappings;
/** Mapping from job id to allocated slots for a job. */
private final Map<JobID, Set<AllocationID>> slotsPerJob;
@@ -191,7 +190,7 @@
SlotID slotId = new SlotID(resourceId, i);
SlotStatus slotStatus;
if (taskSlots.containsKey(i)) {
- TaskSlot taskSlot = taskSlots.get(i);
+ TaskSlot<T> taskSlot = taskSlots.get(i);
slotStatus = new SlotStatus(
slotId,
@@ -209,7 +208,7 @@
slotStatuses.add(slotStatus);
}
- for (TaskSlot taskSlot : allocatedSlots.values()) {
+ for (TaskSlot<T> taskSlot : allocatedSlots.values()) {
if (taskSlot.getIndex() < 0) {
SlotID slotID = SlotID.generateDynamicSlotID(resourceId);
SlotStatus slotStatus = new SlotStatus(
@@ -263,14 +262,14 @@
Preconditions.checkArgument(index < numberSlots);
- TaskSlot taskSlot = allocatedSlots.get(allocationId);
+ TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
if (taskSlot != null) {
LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot);
return false;
}
if (taskSlots.containsKey(index)) {
- TaskSlot duplicatedTaskSlot = taskSlots.get(index);
+ TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index);
LOG.info("Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.",
index,
duplicatedTaskSlot.getResourceProfile(),
@@ -293,7 +292,7 @@
return false;
}
- taskSlot = new TaskSlot(index, resourceProfile, memoryPageSize, jobId, allocationId);
+ taskSlot = new TaskSlot<>(index, resourceProfile, memoryPageSize, jobId, allocationId);
if (index >= 0) {
taskSlots.put(index, taskSlot);
}
@@ -328,7 +327,7 @@
public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException {
checkInit();
- TaskSlot taskSlot = getTaskSlot(allocationId);
+ TaskSlot<T> taskSlot = getTaskSlot(allocationId);
if (taskSlot != null) {
if (taskSlot.markActive()) {
@@ -358,7 +357,7 @@
public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) throws SlotNotFoundException {
checkInit();
- TaskSlot taskSlot = getTaskSlot(allocationId);
+ TaskSlot<T> taskSlot = getTaskSlot(allocationId);
if (taskSlot != null) {
if (taskSlot.markInactive()) {
@@ -400,7 +399,7 @@
public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
checkInit();
- TaskSlot taskSlot = getTaskSlot(allocationId);
+ TaskSlot<T> taskSlot = getTaskSlot(allocationId);
if (taskSlot != null) {
if (LOG.isDebugEnabled()) {
@@ -441,7 +440,7 @@
// and set the slot state to releasing so that it gets eventually freed
taskSlot.markReleasing();
- Iterator<Task> taskIterator = taskSlot.getTasks();
+ Iterator<T> taskIterator = taskSlot.getTasks();
while (taskIterator.hasNext()) {
taskIterator.next().failExternally(cause);
@@ -476,7 +475,7 @@
* @return True if the given task slot is allocated for the given job and allocation id
*/
public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
- TaskSlot taskSlot = taskSlots.get(index);
+ TaskSlot<T> taskSlot = taskSlots.get(index);
if (taskSlot != null) {
return taskSlot.isAllocated(jobId, allocationId);
} else if (index < 0) {
@@ -494,7 +493,7 @@
* @return True if the task slot could be marked active.
*/
public boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) {
- TaskSlot taskSlot = getTaskSlot(allocationId);
+ TaskSlot<T> taskSlot = getTaskSlot(allocationId);
if (taskSlot != null && taskSlot.isAllocated(jobId, allocationId)) {
return taskSlot.markActive();
@@ -529,7 +528,7 @@
* @param jobId for which to return the allocated slots
* @return Iterator of allocated slots.
*/
- public Iterator<TaskSlot> getAllocatedSlots(JobID jobId) {
+ public Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId) {
return new TaskSlotIterator(jobId, TaskSlotState.ALLOCATED);
}
@@ -553,7 +552,7 @@
*/
@Nullable
public JobID getOwningJob(AllocationID allocationId) {
- final TaskSlot taskSlot = getTaskSlot(allocationId);
+ final TaskSlot<T> taskSlot = getTaskSlot(allocationId);
if (taskSlot != null) {
return taskSlot.getJobId();
@@ -574,15 +573,15 @@
* @throws SlotNotActiveException if there was no slot active for task's job and allocation id
* @return True if the task could be added to the task slot; otherwise false
*/
- public boolean addTask(Task task) throws SlotNotFoundException, SlotNotActiveException {
+ public boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException {
Preconditions.checkNotNull(task);
- TaskSlot taskSlot = getTaskSlot(task.getAllocationId());
+ TaskSlot<T> taskSlot = getTaskSlot(task.getAllocationId());
if (taskSlot != null) {
if (taskSlot.isActive(task.getJobID(), task.getAllocationId())) {
if (taskSlot.add(task)) {
- taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping(task, taskSlot));
+ taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping<>(task, taskSlot));
return true;
} else {
@@ -604,14 +603,14 @@
* @param executionAttemptID identifying the task to remove
* @return The removed task if there is any for the given execution attempt id; otherwise null
*/
- public Task removeTask(ExecutionAttemptID executionAttemptID) {
+ public T removeTask(ExecutionAttemptID executionAttemptID) {
checkInit();
- TaskSlotMapping taskSlotMapping = taskSlotMappings.remove(executionAttemptID);
+ TaskSlotMapping<T> taskSlotMapping = taskSlotMappings.remove(executionAttemptID);
if (taskSlotMapping != null) {
- Task task = taskSlotMapping.getTask();
- TaskSlot taskSlot = taskSlotMapping.getTaskSlot();
+ T task = taskSlotMapping.getTask();
+ TaskSlot<T> taskSlot = taskSlotMapping.getTaskSlot();
taskSlot.remove(task.getExecutionId());
@@ -631,8 +630,8 @@
* @param executionAttemptID identifying the requested task
* @return The task for the given execution attempt id if it exist; otherwise null
*/
- public Task getTask(ExecutionAttemptID executionAttemptID) {
- TaskSlotMapping taskSlotMapping = taskSlotMappings.get(executionAttemptID);
+ public T getTask(ExecutionAttemptID executionAttemptID) {
+ TaskSlotMapping<T> taskSlotMapping = taskSlotMappings.get(executionAttemptID);
if (taskSlotMapping != null) {
return taskSlotMapping.getTask();
@@ -647,7 +646,7 @@
* @param jobId identifying the job of the requested tasks
* @return Iterator over all task for a given job
*/
- public Iterator<Task> getTasks(JobID jobId) {
+ public Iterator<T> getTasks(JobID jobId) {
return new TaskIterator(jobId);
}
@@ -658,7 +657,7 @@
* @return Allocation id of the specified slot if allocated; otherwise null
*/
public AllocationID getCurrentAllocation(int index) {
- TaskSlot taskSlot = taskSlots.get(index);
+ TaskSlot<T> taskSlot = taskSlots.get(index);
if (taskSlot == null) {
return null;
}
@@ -672,7 +671,7 @@
* @return the memory manager of the slot allocated for the task
*/
public MemoryManager getTaskMemoryManager(AllocationID allocationID) throws SlotNotFoundException {
- TaskSlot taskSlot = getTaskSlot(allocationID);
+ TaskSlot<T> taskSlot = getTaskSlot(allocationID);
if (taskSlot != null) {
return taskSlot.getMemoryManager();
} else {
@@ -698,7 +697,7 @@
// ---------------------------------------------------------------------
@Nullable
- private TaskSlot getTaskSlot(AllocationID allocationId) {
+ private TaskSlot<T> getTaskSlot(AllocationID allocationId) {
Preconditions.checkNotNull(allocationId);
return allocatedSlots.get(allocationId);
@@ -713,22 +712,22 @@
// ---------------------------------------------------------------------
/**
- * Mapping class between a {@link Task} and a {@link TaskSlot}.
+ * Mapping class between a {@link TaskSlotPayload} and a {@link TaskSlot}.
*/
- private static final class TaskSlotMapping {
- private final Task task;
- private final TaskSlot taskSlot;
+ private static final class TaskSlotMapping<T extends TaskSlotPayload> {
+ private final T task;
+ private final TaskSlot<T> taskSlot;
- private TaskSlotMapping(Task task, TaskSlot taskSlot) {
+ private TaskSlotMapping(T task, TaskSlot<T> taskSlot) {
this.task = Preconditions.checkNotNull(task);
this.taskSlot = Preconditions.checkNotNull(taskSlot);
}
- public Task getTask() {
+ public T getTask() {
return task;
}
- public TaskSlot getTaskSlot() {
+ public TaskSlot<T> getTaskSlot() {
return taskSlot;
}
}
@@ -738,7 +737,7 @@
* the task slots identified by the allocation ids are in the given state.
*/
private final class AllocationIDIterator implements Iterator<AllocationID> {
- private final Iterator<TaskSlot> iterator;
+ private final Iterator<TaskSlot<T>> iterator;
private AllocationIDIterator(JobID jobId, TaskSlotState state) {
iterator = new TaskSlotIterator(jobId, state);
@@ -768,11 +767,11 @@
* Iterator over {@link TaskSlot} which fulfill a given state condition and belong to the given
* job.
*/
- private final class TaskSlotIterator implements Iterator<TaskSlot> {
+ private final class TaskSlotIterator implements Iterator<TaskSlot<T>> {
private final Iterator<AllocationID> allSlots;
private final TaskSlotState state;
- private TaskSlot currentSlot;
+ private TaskSlot<T> currentSlot;
private TaskSlotIterator(JobID jobId, TaskSlotState state) {
@@ -794,7 +793,7 @@
while (currentSlot == null && allSlots.hasNext()) {
AllocationID tempSlot = allSlots.next();
- TaskSlot taskSlot = getTaskSlot(tempSlot);
+ TaskSlot<T> taskSlot = getTaskSlot(tempSlot);
if (taskSlot != null && taskSlot.getState() == state) {
currentSlot = taskSlot;
@@ -805,9 +804,9 @@
}
@Override
- public TaskSlot next() {
+ public TaskSlot<T> next() {
if (currentSlot != null) {
- TaskSlot result = currentSlot;
+ TaskSlot<T> result = currentSlot;
currentSlot = null;
@@ -822,7 +821,7 @@
throw new NoSuchElementException("No more task slots.");
}
- TaskSlot taskSlot = getTaskSlot(tempSlot);
+ TaskSlot<T> taskSlot = getTaskSlot(tempSlot);
if (taskSlot != null && taskSlot.getState() == state) {
return taskSlot;
@@ -838,12 +837,12 @@
}
/**
- * Iterator over all {@link Task} for a given job.
+ * Iterator over all {@link TaskSlotPayload} for a given job.
*/
- private final class TaskIterator implements Iterator<Task> {
- private final Iterator<TaskSlot> taskSlotIterator;
+ private final class TaskIterator implements Iterator<T> {
+ private final Iterator<TaskSlot<T>> taskSlotIterator;
- private Iterator<Task> currentTasks;
+ private Iterator<T> currentTasks;
private TaskIterator(JobID jobId) {
this.taskSlotIterator = new TaskSlotIterator(jobId, TaskSlotState.ACTIVE);
@@ -854,7 +853,7 @@
@Override
public boolean hasNext() {
while ((currentTasks == null || !currentTasks.hasNext()) && taskSlotIterator.hasNext()) {
- TaskSlot taskSlot = taskSlotIterator.next();
+ TaskSlot<T> taskSlot = taskSlotIterator.next();
currentTasks = taskSlot.getTasks();
}
@@ -863,9 +862,9 @@
}
@Override
- public Task next() {
+ public T next() {
while ((currentTasks == null || !currentTasks.hasNext())) {
- TaskSlot taskSlot;
+ TaskSlot<T> taskSlot;
try {
taskSlot = taskSlotIterator.next();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 07577ca..3482e6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -70,6 +70,7 @@
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotPayload;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
@@ -123,7 +124,7 @@
*
* <p>Each Task is run by one dedicated thread.
*/
-public class Task implements Runnable, TaskActions, PartitionProducerStateProvider, CheckpointListener, BackPressureSampleableTask {
+public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionProducerStateProvider, CheckpointListener, BackPressureSampleableTask {
/** The class logger. */
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
@@ -402,6 +403,7 @@
// Accessors
// ------------------------------------------------------------------------
+ @Override
public JobID getJobID() {
return jobId;
}
@@ -410,10 +412,12 @@
return vertexId;
}
+ @Override
public ExecutionAttemptID getExecutionId() {
return executionId;
}
+ @Override
public AllocationID getAllocationId() {
return allocationId;
}
@@ -442,6 +446,7 @@
return executingThread;
}
+ @Override
public CompletableFuture<ExecutionState> getTerminationFuture() {
return terminationFuture;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index fe0e1c2..b24602f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -64,6 +64,7 @@
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
+import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
@@ -291,7 +292,7 @@
Collections.emptyList(),
0);
- final TaskSlotTable taskSlotTable = createTaskSlotTable();
+ final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable();
final TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
false,
@@ -455,7 +456,7 @@
TaskManagerRunner.createBackPressureSampleService(configuration, RPC.getScheduledExecutor()));
}
- private static TaskSlotTable createTaskSlotTable() {
+ private static TaskSlotTable<Task> createTaskSlotTable() {
return TaskSlotUtils.createTaskSlotTable(1, timeout);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
index 2c4c2b0..cdde9be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
@@ -213,7 +213,7 @@
.setMetricQueryServiceAddress(metricQueryServiceAddress)
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
- TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+ TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd1.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd1, env.getJobMasterId(), timeout).get();
@@ -263,7 +263,7 @@
.setSlotSize(2)
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
- TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+ TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd1.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd1, env.getJobMasterId(), timeout).get();
@@ -319,7 +319,7 @@
.useRealNonMockShuffleEnvironment()
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
- TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+ TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd1.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd1, jobMasterId, timeout).get();
@@ -386,7 +386,7 @@
.useRealNonMockShuffleEnvironment()
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
- TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+ TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd1.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd1, jobMasterId, timeout).get();
@@ -437,7 +437,7 @@
.useRealNonMockShuffleEnvironment()
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
- TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+ TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
@@ -470,7 +470,7 @@
.addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture)
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
- TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+ TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
@@ -524,7 +524,7 @@
.useRealNonMockShuffleEnvironment()
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
- TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+ TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
@@ -584,7 +584,7 @@
.useRealNonMockShuffleEnvironment()
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
- TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+ TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
TestingAbstractInvokables.TestInvokableRecordCancel.resetGotCanceledFuture();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index c055489..423e401 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -101,6 +101,7 @@
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
+import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -865,7 +866,7 @@
*/
@Test
public void testJobLeaderDetection() throws Exception {
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
+ final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
final JobManagerTable jobManagerTable = new JobManagerTable();
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
@@ -943,7 +944,7 @@
*/
@Test
public void testSlotAcceptance() throws Exception {
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+ final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
final JobManagerTable jobManagerTable = new JobManagerTable();
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
@@ -1036,7 +1037,7 @@
*/
@Test
public void testSubmitTaskBeforeAcceptSlot() throws Exception {
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+ final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
final JobManagerTable jobManagerTable = new JobManagerTable();
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
@@ -1215,7 +1216,7 @@
final RecordingHeartbeatServices heartbeatServices = new RecordingHeartbeatServices(heartbeatInterval, heartbeatTimeout);
final ResourceID rmResourceID = ResourceID.generate();
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
+ final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
final String rmAddress = "rm";
final TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway(
@@ -1267,7 +1268,7 @@
*/
@Test
public void testRemoveJobFromJobLeaderService() throws Exception {
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
+ final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
@@ -1359,7 +1360,7 @@
@Test
public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exception {
configuration.set(TaskManagerOptions.REGISTRATION_TIMEOUT, TimeUtils.parseDuration("100 ms"));
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
+ final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
@@ -1452,7 +1453,7 @@
*/
@Test
public void testReconnectionAttemptIfExplicitlyDisconnected() throws Exception {
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
+ final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
final TaskExecutor taskExecutor = createTaskExecutor(new TaskManagerServicesBuilder()
.setTaskSlotTable(taskSlotTable)
@@ -1560,7 +1561,7 @@
*/
@Test
public void testInitialSlotReportFailure() throws Exception {
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
+ final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskSlotTable(taskSlotTable)
@@ -1614,7 +1615,7 @@
*/
@Test
public void testOfferSlotToJobMasterAfterTimeout() throws Exception {
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+ final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskSlotTable(taskSlotTable)
.build();
@@ -1794,7 +1795,7 @@
@Test
public void testSyncSlotsWithJobMasterByHeartbeat() throws Exception {
final CountDownLatch activeSlots = new CountDownLatch(2);
- final TaskSlotTable taskSlotTable = new ActivateSlotNotifyingTaskSlotTable(
+ final TaskSlotTable<Task> taskSlotTable = new ActivateSlotNotifyingTaskSlotTable(
2,
activeSlots);
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
@@ -1968,7 +1969,7 @@
final OneShotLatch taskInTerminalState = new OneShotLatch();
final TaskManagerActions taskManagerActions = createTaskManagerActionsWithTerminalStateTrigger(taskInTerminalState);
final JobManagerTable jobManagerTable = createJobManagerTableWithOneJob(jobMasterId, taskManagerActions);
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+ final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
final TaskExecutor taskExecutor = createTaskExecutor(new TaskManagerServicesBuilder()
.setTaskSlotTable(taskSlotTable)
.setJobManagerTable(jobManagerTable)
@@ -2016,7 +2017,7 @@
}
private TaskExecutor createTaskExecutor(int numberOFSlots) {
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(numberOFSlots);
+ final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(numberOFSlots);
final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskSlotTable(taskSlotTable)
@@ -2164,7 +2165,7 @@
}
}
- private static final class TestingTaskSlotTable extends TaskSlotTable {
+ private static final class TestingTaskSlotTable extends TaskSlotTable<Task> {
private final Queue<SlotReport> slotReports;
private TestingTaskSlotTable(Queue<SlotReport> slotReports) {
@@ -2178,7 +2179,7 @@
}
}
- private static final class AllocateSlotNotifyingTaskSlotTable extends TaskSlotTable {
+ private static final class AllocateSlotNotifyingTaskSlotTable extends TaskSlotTable<Task> {
private final OneShotLatch allocateSlotLatch;
@@ -2204,7 +2205,7 @@
}
}
- private static final class ActivateSlotNotifyingTaskSlotTable extends TaskSlotTable {
+ private static final class ActivateSlotNotifyingTaskSlotTable extends TaskSlotTable<Task> {
private final CountDownLatch slotsToActivate;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
index 1f247046..8b348e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
@@ -28,6 +28,7 @@
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import static org.mockito.Mockito.mock;
@@ -43,7 +44,7 @@
private ShuffleEnvironment<?, ?> shuffleEnvironment;
private KvStateService kvStateService;
private BroadcastVariableManager broadcastVariableManager;
- private TaskSlotTable taskSlotTable;
+ private TaskSlotTable<Task> taskSlotTable;
private JobManagerTable jobManagerTable;
private JobLeaderService jobLeaderService;
private TaskExecutorLocalStateStoresManager taskStateManager;
@@ -56,7 +57,7 @@
kvStateService = new KvStateService(new KvStateRegistry(), null, null);
broadcastVariableManager = new BroadcastVariableManager();
taskEventDispatcher = new TaskEventDispatcher();
- taskSlotTable = mock(TaskSlotTable.class);
+ taskSlotTable = (TaskSlotTable<Task>) mock(TaskSlotTable.class);
jobManagerTable = new JobManagerTable();
jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
taskStateManager = mock(TaskExecutorLocalStateStoresManager.class);
@@ -87,7 +88,7 @@
return this;
}
- public TaskManagerServicesBuilder setTaskSlotTable(TaskSlotTable taskSlotTable) {
+ public TaskManagerServicesBuilder setTaskSlotTable(TaskSlotTable<Task> taskSlotTable) {
this.taskSlotTable = taskSlotTable;
return this;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 9c66d2a..1661efb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -94,7 +94,7 @@
private final TestingHighAvailabilityServices haServices;
private final TemporaryFolder temporaryFolder;
- private final TaskSlotTable taskSlotTable;
+ private final TaskSlotTable<Task> taskSlotTable;
private final JobMasterId jobMasterId;
private TestingTaskExecutor taskExecutor;
@@ -122,7 +122,8 @@
if (slotSize > 0) {
this.taskSlotTable = TaskSlotUtils.createTaskSlotTable(slotSize);
} else {
- this.taskSlotTable = mock(TaskSlotTable.class);
+ //noinspection unchecked
+ this.taskSlotTable = (TaskSlotTable<Task>) mock(TaskSlotTable.class);
when(taskSlotTable.tryMarkSlotActive(eq(jobId), any())).thenReturn(true);
when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);
}
@@ -178,7 +179,7 @@
return taskExecutor.getSelfGateway(TaskExecutorGateway.class);
}
- public TaskSlotTable getTaskSlotTable() {
+ public TaskSlotTable<Task> getTaskSlotTable() {
return taskSlotTable;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestTaskManagerActions.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestTaskManagerActions.java
index de36e23..2405476 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestTaskManagerActions.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestTaskManagerActions.java
@@ -24,6 +24,7 @@
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
@@ -40,10 +41,10 @@
public class TestTaskManagerActions implements TaskManagerActions {
private final JobMasterGateway jobMasterGateway;
- private final TaskSlotTable taskSlotTable;
+ private final TaskSlotTable<Task> taskSlotTable;
private final TaskManagerActionListeners taskManagerActionListeners = new TaskManagerActionListeners();
- public TestTaskManagerActions(TaskSlotTable taskSlotTable, JobMasterGateway jobMasterGateway) {
+ public TestTaskManagerActions(TaskSlotTable<Task> taskSlotTable, JobMasterGateway jobMasterGateway) {
this.taskSlotTable = taskSlotTable;
this.jobMasterGateway = jobMasterGateway;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
index 2655b1e..55a71d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
@@ -56,7 +56,7 @@
*/
@Test
public void testTryMarkSlotActive() throws SlotNotFoundException {
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(3);
+ final TaskSlotTable<?> taskSlotTable = TaskSlotUtils.createTaskSlotTable(3);
try {
taskSlotTable.start(new TestingSlotActionsBuilder().build());
@@ -94,7 +94,7 @@
*/
@Test
public void testRedundantSlotAllocation() {
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+ final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
try {
taskSlotTable.start(new TestingSlotActionsBuilder().build());
@@ -108,7 +108,7 @@
assertThat(taskSlotTable.isAllocated(0, jobId, allocationId), is(true));
assertThat(taskSlotTable.isSlotFree(1), is(true));
- Iterator<TaskSlot> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
+ Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
assertThat(allocatedSlots.next().getIndex(), is(0));
assertThat(allocatedSlots.hasNext(), is(false));
} finally {
@@ -118,7 +118,7 @@
@Test
public void testFreeSlot() throws SlotNotFoundException {
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+ final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
try {
taskSlotTable.start(new TestingSlotActionsBuilder().build());
@@ -132,7 +132,7 @@
assertThat(taskSlotTable.freeSlot(allocationId2), is(1));
- Iterator<TaskSlot> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
+ Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
assertThat(allocatedSlots.next().getIndex(), is(0));
assertThat(allocatedSlots.hasNext(), is(false));
assertThat(taskSlotTable.isAllocated(1, jobId, allocationId1), is(false));
@@ -145,7 +145,7 @@
@Test
public void testSlotAllocationWithDynamicSlotId() {
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+ final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
try {
taskSlotTable.start(new TestingSlotActionsBuilder().build());
@@ -155,7 +155,7 @@
assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId, SLOT_TIMEOUT), is(true));
- Iterator<TaskSlot> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
+ Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
assertThat(allocatedSlots.next().getIndex(), is(-1));
assertThat(allocatedSlots.hasNext(), is(false));
assertThat(taskSlotTable.isAllocated(-1, jobId, allocationId), is(true));
@@ -166,7 +166,7 @@
@Test
public void testSlotAllocationWithResourceProfile() {
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+ final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
try {
taskSlotTable.start(new TestingSlotActionsBuilder().build());
@@ -178,8 +178,8 @@
assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId, resourceProfile, SLOT_TIMEOUT), is(true));
- Iterator<TaskSlot> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
- TaskSlot allocatedSlot = allocatedSlots.next();
+ Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
+ TaskSlot<TaskSlotPayload> allocatedSlot = allocatedSlots.next();
assertThat(allocatedSlot.getIndex(), is(-1));
assertThat(allocatedSlot.getResourceProfile(), is(resourceProfile));
assertThat(allocatedSlots.hasNext(), is(false));
@@ -190,7 +190,7 @@
@Test
public void testSlotAllocationWithResourceProfileFailure() {
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
+ final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(2);
try {
taskSlotTable.start(new TestingSlotActionsBuilder().build());
@@ -202,7 +202,7 @@
assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId, resourceProfile, SLOT_TIMEOUT), is(false));
- Iterator<TaskSlot> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
+ Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
assertThat(allocatedSlots.hasNext(), is(false));
} finally {
taskSlotTable.stop();
@@ -211,7 +211,7 @@
@Test
public void testGenerateSlotReport() throws SlotNotFoundException {
- final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(3);
+ final TaskSlotTable<TaskSlotPayload> taskSlotTable = TaskSlotUtils.createTaskSlotTable(3);
try {
taskSlotTable.start(new TestingSlotActionsBuilder().build());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
index 8aeb287..27c8d1c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
@@ -39,22 +39,22 @@
.setNetworkMemory(new MemorySize(100 * 1024))
.build();
- public static TaskSlotTable createTaskSlotTable(int numberOfSlots) {
+ public static <T extends TaskSlotPayload> TaskSlotTable<T> createTaskSlotTable(int numberOfSlots) {
return createTaskSlotTable(
numberOfSlots,
createDefaultTimerService(DEFAULT_SLOT_TIMEOUT));
}
- public static TaskSlotTable createTaskSlotTable(int numberOfSlots, Time timeout) {
+ public static <T extends TaskSlotPayload> TaskSlotTable<T> createTaskSlotTable(int numberOfSlots, Time timeout) {
return createTaskSlotTable(
numberOfSlots,
createDefaultTimerService(timeout.toMilliseconds()));
}
- private static TaskSlotTable createTaskSlotTable(
+ private static <T extends TaskSlotPayload> TaskSlotTable<T> createTaskSlotTable(
int numberOfSlots,
TimerService<AllocationID> timerService) {
- return new TaskSlotTable(
+ return new TaskSlotTable<>(
numberOfSlots,
createTotalResourceProfile(numberOfSlots),
DEFAULT_RESOURCE_PROFILE,