blob: 7c7985911d1a57effe9bbe1bab5f7b13dda384db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.clusterframework.types;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSchedulingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulingStrategy;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
public class SlotProfileTest extends TestLogger {
private final ResourceProfile resourceProfile = new ResourceProfile(2, 1024);
private final AllocationID aid1 = new AllocationID();
private final AllocationID aid2 = new AllocationID();
private final AllocationID aid3 = new AllocationID();
private final AllocationID aid4 = new AllocationID();
private final AllocationID aidX = new AllocationID();
private final TaskManagerLocation tml1 = new TaskManagerLocation(new ResourceID("tm-1"), InetAddress.getLoopbackAddress(), 42);
private final TaskManagerLocation tml2 = new TaskManagerLocation(new ResourceID("tm-2"), InetAddress.getLoopbackAddress(), 43);
private final TaskManagerLocation tml3 = new TaskManagerLocation(new ResourceID("tm-3"), InetAddress.getLoopbackAddress(), 44);
private final TaskManagerLocation tml4 = new TaskManagerLocation(new ResourceID("tm-4"), InetAddress.getLoopbackAddress(), 45);
private final TaskManagerLocation tmlX = new TaskManagerLocation(new ResourceID("tm-X"), InetAddress.getLoopbackAddress(), 46);
private final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
private SimpleSlotContext ssc1 = new SimpleSlotContext(aid1, tml1, 1, taskManagerGateway);
private SimpleSlotContext ssc2 = new SimpleSlotContext(aid2, tml2, 2, taskManagerGateway);
private SimpleSlotContext ssc3 = new SimpleSlotContext(aid3, tml3, 3, taskManagerGateway);
private SimpleSlotContext ssc4 = new SimpleSlotContext(aid4, tml4, 4, taskManagerGateway);
private final Set<SlotContext> candidates = Collections.unmodifiableSet(createCandidates());
private final SchedulingStrategy schedulingStrategy = PreviousAllocationSchedulingStrategy.getInstance();
private Set<SlotContext> createCandidates() {
Set<SlotContext> candidates = new HashSet<>(4);
candidates.add(ssc1);
candidates.add(ssc2);
candidates.add(ssc3);
candidates.add(ssc4);
return candidates;
}
@Test
public void matchNoRequirements() {
SlotProfile slotProfile = new SlotProfile(resourceProfile, Collections.emptyList(), Collections.emptyList());
SlotContext match = runMatching(slotProfile);
Assert.assertTrue(candidates.contains(match));
}
@Test
public void matchPreferredLocationNotAvailable() {
SlotProfile slotProfile = new SlotProfile(resourceProfile, Collections.singletonList(tmlX), Collections.emptyList());
SlotContext match = runMatching(slotProfile);
Assert.assertTrue(candidates.contains(match));
}
@Test
public void matchPreferredLocation() {
SlotProfile slotProfile = new SlotProfile(resourceProfile, Collections.singletonList(tml2), Collections.emptyList());
SlotContext match = runMatching(slotProfile);
Assert.assertEquals(ssc2, match);
slotProfile = new SlotProfile(resourceProfile, Arrays.asList(tmlX, tml4), Collections.emptyList());
match = runMatching(slotProfile);
Assert.assertEquals(ssc4, match);
}
@Test
public void matchPreviousAllocationOverridesPreferredLocation() {
SlotProfile slotProfile = new SlotProfile(resourceProfile, Collections.singletonList(tml2), Collections.singletonList(aid3));
SlotContext match = runMatching(slotProfile);
Assert.assertEquals(ssc3, match);
slotProfile = new SlotProfile(resourceProfile, Arrays.asList(tmlX, tml1), Arrays.asList(aidX, aid2));
match = runMatching(slotProfile);
Assert.assertEquals(ssc2, match);
}
@Test
public void matchPreviousLocationNotAvailable() {
SlotProfile slotProfile = new SlotProfile(resourceProfile, Collections.singletonList(tml4), Collections.singletonList(aidX));
SlotContext match = runMatching(slotProfile);
Assert.assertEquals(null, match);
}
private SlotContext runMatching(SlotProfile slotProfile) {
return schedulingStrategy.findMatchWithLocality(
slotProfile,
candidates.stream(),
(candidate) -> candidate,
(candidate) -> true,
(candidate, locality) -> candidate);
}
}