[FLINK-9932] Harden slot allocation protocol

Harden slot allocation protocol by accepting task submissions for slots which are only
allocated. Before, it was necessary that the slot was marked as active. This, however,
required that task submissions come strictly after completing the slot offering which
is not the case. With this change, we mark all allocated and active slots as active
if the TaskExecutor receives a task submission.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 5062536..3d06efd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -445,7 +445,7 @@
 				throw new TaskSubmissionException(message);
 			}
 
-			if (!taskSlotTable.existsActiveSlot(jobId, tdd.getAllocationId())) {
+			if (!taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) {
 				final String message = "No task slot allocated for job ID " + jobId +
 					" and allocation ID " + tdd.getAllocationId() + '.';
 				log.debug(message);
@@ -1056,18 +1056,6 @@
 
 				while (reservedSlotsIterator.hasNext()) {
 					SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
-					try {
-						if (!taskSlotTable.markSlotActive(offer.getAllocationId())) {
-							// the slot is either free or releasing at the moment
-							final String message = "Could not mark slot " + jobId + " active.";
-							log.debug(message);
-							jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new Exception(message));
-						}
-					} catch (SlotNotFoundException e) {
-						final String message = "Could not mark slot " + jobId + " active.";
-						jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new Exception(message));
-						continue;
-					}
 					reservedSlots.add(offer);
 				}
 
@@ -1097,6 +1085,24 @@
 							if (isJobManagerConnectionValid(jobId, jobMasterId)) {
 								// mark accepted slots active
 								for (SlotOffer acceptedSlot : acceptedSlots) {
+									try {
+										if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) {
+											// the slot is either free or releasing at the moment
+											final String message = "Could not mark slot " + jobId + " active.";
+											log.debug(message);
+											jobMasterGateway.failSlot(
+												getResourceID(),
+												acceptedSlot.getAllocationId(),
+												new FlinkException(message));
+										}
+									} catch (SlotNotFoundException e) {
+										final String message = "Could not mark slot " + jobId + " active.";
+										jobMasterGateway.failSlot(
+											getResourceID(),
+											acceptedSlot.getAllocationId(),
+											new FlinkException(message));
+									}
+
 									reservedSlots.remove(acceptedSlot);
 								}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index c94e278..867203c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -379,17 +379,17 @@
 	}
 
 	/**
-	 * Check whether there exists an active slot for the given job and allocation id.
+	 * Try to mark the specified slot as active if it has been allocated by the given job.
 	 *
 	 * @param jobId of the allocated slot
 	 * @param allocationId identifying the allocation
-	 * @return True if there exists a task slot which is active for the given job and allocation id.
+	 * @return True if the task slot could be marked active.
 	 */
-	public boolean existsActiveSlot(JobID jobId, AllocationID allocationId) {
+	public boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) {
 		TaskSlot taskSlot = getTaskSlot(allocationId);
 
-		if (taskSlot != null) {
-			return taskSlot.isActive(jobId, allocationId);
+		if (taskSlot != null && taskSlot.isAllocated(jobId, allocationId)) {
+			return taskSlot.markActive();
 		} else {
 			return false;
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index a372a4f..550f31a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -35,8 +35,6 @@
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.execution.Environment;
@@ -69,9 +67,6 @@
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -90,10 +85,10 @@
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -728,7 +723,7 @@
 		jobManagerTable.put(jobId, jobManagerConnection);
 
 		final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
-		when(taskSlotTable.existsActiveSlot(eq(jobId), eq(allocationId))).thenReturn(true);
+		when(taskSlotTable.tryMarkSlotActive(eq(jobId), eq(allocationId))).thenReturn(true);
 		when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);
 
 		TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
@@ -993,8 +988,8 @@
 
 			assertThat(instanceIDSlotIDAllocationIDTuple3, equalTo(expectedResult));
 
-			assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1));
-			assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2));
+			assertTrue(taskSlotTable.tryMarkSlotActive(jobId, allocationId1));
+			assertFalse(taskSlotTable.tryMarkSlotActive(jobId, allocationId2));
 			assertTrue(taskSlotTable.isSlotFree(1));
 		} finally {
 			taskManager.shutDown();
@@ -1011,61 +1006,37 @@
 		final JobManagerTable jobManagerTable = new JobManagerTable();
 		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
 
-		final String resourceManagerAddress = "rm";
-		final UUID resourceManagerLeaderId = UUID.randomUUID();
-
-		final String jobManagerAddress = "jm";
-		final JobMasterId jobMasterId = JobMasterId.generate();
-
-		resourceManagerLeaderRetriever.notifyListener(resourceManagerAddress, resourceManagerLeaderId);
-		jobManagerLeaderRetriever.notifyListener(jobManagerAddress, jobMasterId.toUUID());
-
 		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+		resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
 
 		final CompletableFuture<Tuple3<InstanceID, SlotID, AllocationID>> availableSlotFuture = new CompletableFuture<>();
 		resourceManagerGateway.setNotifySlotAvailableConsumer(availableSlotFuture::complete);
 
-		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
-
 		final AllocationID allocationId1 = new AllocationID();
 		final AllocationID allocationId2 = new AllocationID();
 
 		final SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
 
-		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
+		final OneShotLatch offerSlotsLatch = new OneShotLatch();
+		final OneShotLatch taskInTerminalState = new OneShotLatch();
+		final CompletableFuture<Collection<SlotOffer>> offerResultFuture = new CompletableFuture<>();
+		final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder()
+			.setOfferSlotsFunction((resourceID, slotOffers) -> {
+				offerSlotsLatch.trigger();
+				return offerResultFuture;
+			})
+			.setUpdateTaskExecutionStateFunction(taskExecutionState -> {
+				if (taskExecutionState.getExecutionState().isTerminal()) {
+					taskInTerminalState.trigger();
+				}
+				return CompletableFuture.completedFuture(Acknowledge.get());
+			})
+			.build();
 
-		when(jobMasterGateway.registerTaskManager(
-			any(String.class),
-			eq(taskManagerLocation),
-			any(Time.class)
-		)).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId)));
-		when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
-		when(jobMasterGateway.updateTaskExecutionState(any(TaskExecutionState.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+		jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
 
-		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
-		rpc.registerGateway(jobManagerAddress, jobMasterGateway);
-
-		final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class);
-		when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader());
-
-		final JobManagerConnection jobManagerConnection = new JobManagerConnection(
-			jobId,
-			jmResourceId,
-			jobMasterGateway,
-			mock(TaskManagerActions.class),
-			mock(CheckpointResponder.class),
-			libraryCacheManager,
-			mock(ResultPartitionConsumableNotifier.class),
-			mock(PartitionProducerStateChecker.class));
-
-		final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class);
-		TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
-		when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
-
-		when(taskManagerMetricGroup.addTaskForJob(
-			any(JobID.class), anyString(), any(JobVertexID.class), any(ExecutionAttemptID.class),
-			anyString(), anyInt(), anyInt())
-		).thenReturn(taskMetricGroup);
+		rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
+		rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
 
 		final NetworkEnvironment networkMock = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);
 
@@ -1089,7 +1060,7 @@
 			haServices,
 			taskManagerServices,
 			new HeartbeatServices(1000L, 1000L),
-			taskManagerMetricGroup,
+			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
 			null,
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
@@ -1117,7 +1088,7 @@
 				"test task",
 				1,
 				1,
-				TestInvokable.class.getName(),
+				NoOpInvokable.class.getName(),
 				new Configuration());
 
 			SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(jobInformation);
@@ -1133,27 +1104,17 @@
 				0,
 				0,
 				null,
-				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-				Collections.<InputGateDeploymentDescriptor>emptyList());
-
-			CompletableFuture<Collection<SlotOffer>> offerResultFuture = new CompletableFuture<>();
-
-			// submit task first and then return acceptance response
-			when(
-				jobMasterGateway.offerSlots(
-					any(ResourceID.class),
-					any(Collection.class),
-					any(Time.class)))
-				.thenReturn(offerResultFuture);
+				Collections.emptyList(),
+				Collections.emptyList());
 
 			// we have to add the job after the TaskExecutor, because otherwise the service has not
 			// been properly started. This will also offer the slots to the job master
-			jobLeaderService.addJob(jobId, jobManagerAddress);
+			jobLeaderService.addJob(jobId, jobMasterGateway.getAddress());
 
-			verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(any(ResourceID.class), any(Collection.class), any(Time.class));
+			offerSlotsLatch.await();
 
 			// submit the task without having acknowledge the offered slots
-			tmGateway.submitTask(tdd, jobMasterId, timeout);
+			tmGateway.submitTask(tdd, jobMasterGateway.getFencingToken(), timeout).get();
 
 			// acknowledge the offered slots
 			offerResultFuture.complete(Collections.singleton(offer1));
@@ -1161,9 +1122,12 @@
 			final Tuple3<InstanceID, SlotID, AllocationID> instanceIDSlotIDAllocationIDTuple3 = availableSlotFuture.get();
 			assertThat(instanceIDSlotIDAllocationIDTuple3.f1, equalTo(new SlotID(taskManagerLocation.getResourceID(), 1)));
 
-			assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1));
-			assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2));
+			assertTrue(taskSlotTable.tryMarkSlotActive(jobId, allocationId1));
+			assertFalse(taskSlotTable.tryMarkSlotActive(jobId, allocationId2));
 			assertTrue(taskSlotTable.isSlotFree(1));
+
+			// wait for the task completion
+			taskInTerminalState.await();
 		} finally {
 			taskManager.shutDown();
 			taskManager.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
new file mode 100644
index 0000000..ef3fc06
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link TaskSlotTable}.
+ */
+public class TaskSlotTableTest extends TestLogger {
+
+	private static final Time SLOT_TIMEOUT = Time.seconds(100L);
+
+	/**
+	 * Tests that one can can mark allocated slots as active.
+	 */
+	@Test
+	public void testTryMarkSlotActive() throws SlotNotFoundException {
+		final TaskSlotTable taskSlotTable = createTaskSlotTable(Collections.nCopies(3, ResourceProfile.UNKNOWN));
+
+		try {
+			taskSlotTable.start(new TestingSlotActionsBuilder().build());
+
+			final JobID jobId1 = new JobID();
+			final AllocationID allocationId1 = new AllocationID();
+			taskSlotTable.allocateSlot(0, jobId1, allocationId1, SLOT_TIMEOUT);
+			final AllocationID allocationId2 = new AllocationID();
+			taskSlotTable.allocateSlot(1, jobId1, allocationId2, SLOT_TIMEOUT);
+			final AllocationID allocationId3 = new AllocationID();
+			final JobID jobId2 = new JobID();
+			taskSlotTable.allocateSlot(2, jobId2, allocationId3, SLOT_TIMEOUT);
+
+			taskSlotTable.markSlotActive(allocationId1);
+
+			assertThat(taskSlotTable.isAllocated(0, jobId1, allocationId1), is(true));
+			assertThat(taskSlotTable.isAllocated(1, jobId1, allocationId2), is(true));
+			assertThat(taskSlotTable.isAllocated(2, jobId2, allocationId3), is(true));
+
+			assertThat(IteratorUtils.toList(taskSlotTable.getActiveSlots(jobId1)), is(equalTo(Arrays.asList(allocationId1))));
+
+			assertThat(taskSlotTable.tryMarkSlotActive(jobId1, allocationId1), is(true));
+			assertThat(taskSlotTable.tryMarkSlotActive(jobId1, allocationId2), is(true));
+			assertThat(taskSlotTable.tryMarkSlotActive(jobId1, allocationId3), is(false));
+
+			assertThat(Sets.newHashSet(taskSlotTable.getActiveSlots(jobId1)), is(equalTo(new HashSet<>(Arrays.asList(allocationId2, allocationId1)))));
+		} finally {
+			taskSlotTable.stop();
+		}
+	}
+
+	@Nonnull
+	private TaskSlotTable createTaskSlotTable(final Collection<ResourceProfile> resourceProfiles) {
+		return new TaskSlotTable(
+			resourceProfiles,
+			new TimerService<>(TestingUtils.defaultExecutor(), 10000L));
+	}
+
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingSlotActions.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingSlotActions.java
new file mode 100644
index 0000000..ad24189
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingSlotActions.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import javax.annotation.Nonnull;
+
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/**
+ * {@link SlotActions} implementation for testing purposes.
+ */
+public class TestingSlotActions implements SlotActions {
+
+	@Nonnull
+	private final Consumer<AllocationID> freeSlotConsumer;
+
+	@Nonnull
+	private final BiConsumer<AllocationID, UUID> timeoutSlotConsumer;
+
+	public TestingSlotActions(@Nonnull Consumer<AllocationID> freeSlotConsumer, @Nonnull BiConsumer<AllocationID, UUID> timeoutSlotConsumer) {
+		this.freeSlotConsumer = freeSlotConsumer;
+		this.timeoutSlotConsumer = timeoutSlotConsumer;
+	}
+
+	@Override
+	public void freeSlot(AllocationID allocationId) {
+		freeSlotConsumer.accept(allocationId);
+	}
+
+	@Override
+	public void timeoutSlot(AllocationID allocationId, UUID ticket) {
+		timeoutSlotConsumer.accept(allocationId, ticket);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingSlotActionsBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingSlotActionsBuilder.java
new file mode 100644
index 0000000..f1bef55
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TestingSlotActionsBuilder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/**
+ * Builder for the {@link TestingSlotActions}.
+ */
+public class TestingSlotActionsBuilder {
+	private Consumer<AllocationID> freeSlotConsumer = ignored -> {};
+	private BiConsumer<AllocationID, UUID> timeoutSlotConsumer = (ignoredA, ignoredB) -> {};
+
+	public TestingSlotActionsBuilder setFreeSlotConsumer(Consumer<AllocationID> freeSlotConsumer) {
+		this.freeSlotConsumer = freeSlotConsumer;
+		return this;
+	}
+
+	public TestingSlotActionsBuilder setTimeoutSlotConsumer(BiConsumer<AllocationID, UUID> timeoutSlotConsumer) {
+		this.timeoutSlotConsumer = timeoutSlotConsumer;
+		return this;
+	}
+
+	public TestingSlotActions build() {
+		return new TestingSlotActions(freeSlotConsumer, timeoutSlotConsumer);
+	}
+}