blob: ee5eead7759f57eae8e3cae29c20cd7cf6703561 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster.model.appstate
import groovy.util.logging.Slf4j
import org.apache.hadoop.yarn.api.records.Container
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest
import org.apache.slider.server.appmaster.model.mock.MockFactory
import org.apache.slider.server.appmaster.model.mock.MockRMOperationHandler
import org.apache.slider.server.appmaster.model.mock.MockRoles
import org.apache.slider.server.appmaster.model.mock.MockYarnEngine
import org.apache.slider.server.appmaster.operations.AbstractRMOperation
import org.apache.slider.server.appmaster.operations.CancelRequestOperation
import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation
import org.apache.slider.server.appmaster.operations.ContainerRequestOperation
import org.apache.slider.server.appmaster.operations.RMOperationHandler
import org.apache.slider.server.appmaster.state.ContainerAssignment
import org.apache.slider.server.appmaster.state.RoleInstance
import org.junit.Test
import static org.apache.slider.server.appmaster.state.ContainerPriority.buildPriority
import static org.apache.slider.server.appmaster.state.ContainerPriority.extractRole
@Slf4j
class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockRoles {
@Override
String getTestName() {
return "TestMockAppStateRMOperations"
}
@Test
public void testPriorityOnly() throws Throwable {
assert 5 == extractRole(buildPriority(5, false))
}
@Test
public void testPriorityRoundTrip() throws Throwable {
assert 5 == extractRole(buildPriority(5, false))
}
@Test
public void testPriorityRoundTripWithRequest() throws Throwable {
int priority = buildPriority(5, false)
assert 5 == extractRole(priority)
}
@Test
public void testMockAddOp() throws Throwable {
role0Status.desired = 1
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
assertListLength(ops, 1)
ContainerRequestOperation operation = (ContainerRequestOperation) ops[0]
int priority = operation.request.priority.priority
assert extractRole(priority) == MockFactory.PROVIDER_ROLE0.id
RMOperationHandler handler = new MockRMOperationHandler()
handler.execute(ops)
AbstractRMOperation op = handler.operations[0]
assert op instanceof ContainerRequestOperation
}
/**
* Test of a flex up and down op which verifies that outstanding
* requests are cancelled first.
* <ol>
* <li>request 5 nodes, assert 5 request made</li>
* <li>allocate 1 of them</li>
* <li>flex cluster size to 3</li>
* <li>assert this generates 2 cancel requests</li>
* </ol>
*/
@Test
public void testRequestThenCancelOps() throws Throwable {
def role0 = role0Status
role0.desired = 5
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
assertListLength(ops, 5)
// now 5 outstanding requests.
assert role0.requested == 5
// allocate one
role0.incActual()
role0.decRequested()
assert role0.requested == 4
// flex cluster to 3
role0.desired = 3
ops = appState.reviewRequestAndReleaseNodes()
// expect a cancel operation from review
assertListLength(ops, 1)
assert ops[0] instanceof CancelRequestOperation
RMOperationHandler handler = new MockRMOperationHandler()
handler.availableToCancel = 4;
handler.execute(ops)
assert handler.availableToCancel == 2
assert role0.requested == 2
// flex down one more
role0.desired = 2
ops = appState.reviewRequestAndReleaseNodes()
assertListLength(ops, 1)
assert ops[0] instanceof CancelRequestOperation
handler.execute(ops)
assert handler.availableToCancel == 1
assert role0.requested == 1
}
@Test
public void testCancelNoActualContainers() throws Throwable {
def role0 = role0Status
role0.desired = 5
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
assertListLength(ops, 5)
// now 5 outstanding requests.
assert role0.requested == 5
role0.desired = 0
ops = appState.reviewRequestAndReleaseNodes()
assertListLength(ops, 1)
CancelRequestOperation cancel = ops[0] as CancelRequestOperation
assert cancel.count == 5
}
@Test
public void testFlexDownOutstandingRequests() throws Throwable {
// engine only has two nodes, so > 2 will be outstanding
engine = new MockYarnEngine(1, 2)
List<AbstractRMOperation> ops
// role: desired = 2, requested = 1, actual=1
def role0 = role0Status
role0.desired = 4
createAndSubmitNodes()
assert role0.requested == 2
assert role0.actual == 2
// there are now two outstanding, two actual
// Release 3 and verify that the two
// cancellations were combined with a release
role0.desired = 1;
assert role0.delta == -3
ops = appState.reviewRequestAndReleaseNodes()
assertListLength(ops, 2)
assert role0.requested == 0
assert role0.releasing == 1
}
@Test
public void testCancelAllOutstandingRequests() throws Throwable {
// role: desired = 2, requested = 1, actual=1
def role0 = role0Status
role0.desired = 2
role0.incRequested()
role0.incRequested()
List<AbstractRMOperation> ops
// there are now two outstanding, two actual
// Release 3 and verify that the two
// cancellations were combined with a release
role0.desired = 0;
ops = appState.reviewRequestAndReleaseNodes()
assertListLength(ops, 1)
CancelRequestOperation cancel = ops[0] as CancelRequestOperation
assert cancel.getCount() == 2
}
@Test
public void testFlexUpOutstandingRequests() throws Throwable {
// role: desired = 2, requested = 1, actual=1
def role0 = role0Status
role0.desired = 2
role0.incActual();
role0.incRequested()
List<AbstractRMOperation> ops
// flex up 2 nodes, yet expect only one node to be requested,
// as the outstanding request is taken into account
role0.desired = 4;
role0.incRequested()
assert role0.actual == 1;
assert role0.requested == 2;
assert role0.actualAndRequested == 3;
assert role0.delta == 1
ops = appState.reviewRequestAndReleaseNodes()
assertListLength(ops, 1)
assert ops[0] instanceof ContainerRequestOperation
assert role0.requested == 3
}
@Test
public void testFlexUpNoSpace() throws Throwable {
// engine only has two nodes, so > 2 will be outstanding
engine = new MockYarnEngine(1, 2)
List<AbstractRMOperation> ops
// role: desired = 2, requested = 1, actual=1
def role0 = role0Status
role0.desired = 4
createAndSubmitNodes()
assert role0.requested == 2
assert role0.actual == 2
role0.desired = 8;
assert role0.delta == 4
createAndSubmitNodes()
assert role0.requested == 6
}
@Test
public void testAllocateReleaseOp() throws Throwable {
role0Status.desired = 1
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
ContainerRequestOperation operation = (ContainerRequestOperation) ops[0]
def (ArrayList<ContainerAssignment> assignments, Container cont, AMRMClient.ContainerRequest request) = satisfyContainerRequest(
operation)
assertListLength(ops, 1)
assertListLength(assignments, 1)
ContainerAssignment assigned = assignments[0]
Container target = assigned.container
assert target.id == cont.id
int roleId = assigned.role.priority
assert roleId == extractRole(request.priority)
assert assigned.role.name == ROLE0
RoleInstance ri = roleInstance(assigned)
//tell the app it arrived
appState.containerStartSubmitted(target, ri);
appState.innerOnNodeManagerContainerStarted(target.id)
assert role0Status.started == 1
//now release it by changing the role status
role0Status.desired = 0
ops = appState.reviewRequestAndReleaseNodes()
assertListLength(ops, 1)
assert ops[0] instanceof ContainerReleaseOperation
ContainerReleaseOperation release = (ContainerReleaseOperation) ops[0]
assert release.containerId == cont.id
}
public List satisfyContainerRequest(ContainerRequestOperation operation) {
AMRMClient.ContainerRequest request = operation.request
Container cont = engine.allocateContainer(request)
List<Container> allocated = [cont]
List<ContainerAssignment> assignments = [];
List<AbstractRMOperation> operations = []
appState.onContainersAllocated(allocated, assignments, operations)
return [assignments, cont, request]
}
@Test
public void testComplexAllocation() throws Throwable {
role0Status.desired = 1
role1Status.desired = 3
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
List<Container> allocations = engine.execute(ops)
List<ContainerAssignment> assignments = [];
List<AbstractRMOperation> releases = []
appState.onContainersAllocated(allocations, assignments, releases)
assertListLength(releases, 0)
assertListLength(assignments, 4)
assignments.each { ContainerAssignment assigned ->
Container target = assigned.container
RoleInstance ri = roleInstance(assigned)
appState.containerStartSubmitted(target, ri);
}
//insert some async operation here
assignments.each { ContainerAssignment assigned ->
Container target = assigned.container
appState.innerOnNodeManagerContainerStarted(target.id)
}
assert engine.containerCount() == 4;
role1Status.desired = 0
ops = appState.reviewRequestAndReleaseNodes()
assertListLength(ops, 3)
allocations = engine.execute(ops)
assert engine.containerCount() == 1;
appState.onContainersAllocated(allocations, assignments, releases)
assert assignments.empty
assert releases.empty
}
@Test
public void testDoubleNodeManagerStartEvent() throws Throwable {
role0Status.desired = 1
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
List<Container> allocations = engine.execute(ops)
List<ContainerAssignment> assignments = [];
List<AbstractRMOperation> releases = []
appState.onContainersAllocated(allocations, assignments, releases)
assertListLength(assignments, 1)
ContainerAssignment assigned = assignments[0]
Container target = assigned.container
RoleInstance ri = roleInstance(assigned)
appState.containerStartSubmitted(target, ri);
RoleInstance ri2 = appState.innerOnNodeManagerContainerStarted(target.id)
assert ri2 == ri
//try a second time, expect an error
try {
appState.innerOnNodeManagerContainerStarted(target.id)
fail("Expected an exception")
} catch (RuntimeException expected) {
// expected
}
//and non-faulter should not downgrade to a null
log.warn("Ignore any exception/stack trace that appears below")
log.warn("===============================================================")
RoleInstance ri3 = appState.onNodeManagerContainerStarted(target.id)
log.warn("===============================================================")
log.warn("Ignore any exception/stack trace that appeared above")
assert ri3 == null
}
}