blob: cf1940a447657748df2e0a7debe6f2eb7e013061 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.scheduler;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.createSchedulingRequirements;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertThat;
/**
* Tests for {@link AbstractExecutionSlotAllocator}.
*/
public class AbstractExecutionSlotAllocatorTest extends TestLogger {
private AbstractExecutionSlotAllocator executionSlotAllocator;
@Before
public void setUp() throws Exception {
executionSlotAllocator = new TestingExecutionSlotAllocator();
}
@Test
public void testCancel() {
final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
createSchedulingRequirements(executionVertexId);
final List<SlotExecutionVertexAssignment> assignments =
executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
executionSlotAllocator.cancel(executionVertexId);
assertThat(assignments.get(0).getLogicalSlotFuture().isCancelled(), is(true));
}
@Test(expected = IllegalStateException.class)
public void testValidateSchedulingRequirements() {
final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
createSchedulingRequirements(executionVertexId);
executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
executionSlotAllocator.validateSchedulingRequirements(schedulingRequirements);
}
@Test
public void testCreateAndRegisterSlotExecutionVertexAssignment() {
final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
createSchedulingRequirements(executionVertexId);
final List<SlotExecutionVertexAssignment> assignments =
executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
assertThat(assignments, hasSize(1));
final SlotExecutionVertexAssignment assignment = assignments.get(0);
assertThat(assignment.getExecutionVertexId(), is(executionVertexId));
assertThat(assignment.getLogicalSlotFuture().isDone(), is(false));
assertThat(executionSlotAllocator.getPendingSlotAssignments().values(), contains(assignment));
assignment.getLogicalSlotFuture().cancel(false);
assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0));
}
@Test
public void testCompletedExecutionVertexAssignmentWillBeUnregistered() {
final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
createSchedulingRequirements(executionVertexId);
final List<SlotExecutionVertexAssignment> assignments =
executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
assignments.get(0).getLogicalSlotFuture().cancel(false);
assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0));
}
@Test
public void testComputeAllPriorAllocationIds() {
final List<AllocationID> expectAllocationIds = Arrays.asList(new AllocationID(), new AllocationID());
final List<ExecutionVertexSchedulingRequirements> testSchedulingRequirements = Arrays.asList(
createSchedulingRequirement(0, expectAllocationIds.get(0)),
createSchedulingRequirement(1, expectAllocationIds.get(0)),
createSchedulingRequirement(2, expectAllocationIds.get(1)),
createSchedulingRequirement(3)
);
final Set<AllocationID> allPriorAllocationIds =
AbstractExecutionSlotAllocator.computeAllPriorAllocationIds(testSchedulingRequirements);
assertThat(allPriorAllocationIds, containsInAnyOrder(expectAllocationIds.toArray()));
}
private ExecutionVertexSchedulingRequirements createSchedulingRequirement(
final int subtaskIndex) {
return createSchedulingRequirement(subtaskIndex, null);
}
private ExecutionVertexSchedulingRequirements createSchedulingRequirement(
final int subtaskIndex,
@Nullable final AllocationID previousAllocationId) {
return new ExecutionVertexSchedulingRequirements.Builder()
.withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), subtaskIndex))
.withSlotSharingGroupId(new SlotSharingGroupId())
.withPreviousAllocationId(previousAllocationId)
.build();
}
private static class TestingExecutionSlotAllocator extends AbstractExecutionSlotAllocator {
TestingExecutionSlotAllocator() {
super(
new DefaultPreferredLocationsRetriever(
new TestingStateLocationRetriever(),
new TestingInputsLocationsRetriever.Builder().build()));
}
@Override
public List<SlotExecutionVertexAssignment> allocateSlotsFor(
final List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
new ArrayList<>(executionVertexSchedulingRequirements.size());
for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
slotExecutionVertexAssignments.add(
createAndRegisterSlotExecutionVertexAssignment(
schedulingRequirements.getExecutionVertexId(),
new CompletableFuture<>(),
throwable -> {}));
}
return slotExecutionVertexAssignments;
}
}
}