blob: 1f1b9eb27391542f5996549dd6dca4a6c2aa01b4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Test;
import javax.ws.rs.core.MediaType;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
public class TestRMWebServicesSchedulerActivities
extends TestRMWebServicesCapacitySched {
private static final Log LOG = LogFactory.getLog(
TestRMWebServicesSchedulerActivities.class);
@Test
public void testAssignMultipleContainersPerNodeHeartbeat()
throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "127.0.0.1",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "/default-rack",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
10)), null);
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("nodeId", "127.0.0.1:1234");
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(1000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 11);
JSONArray allocations = json.getJSONArray("allocations");
for (int i = 0; i < allocations.length(); i++) {
if (i != allocations.length() - 1) {
verifyStateOfAllocations(allocations.getJSONObject(i),
"finalAllocationState", "ALLOCATED");
verifyQueueOrder(allocations.getJSONObject(i), "root-a-b-b2-b3-b1");
}
}
}
finally {
rm.stop();
}
}
@Test
public void testAssignWithoutAvailableResource() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "127.0.0.1",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "/default-rack",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
10)), null);
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("nodeId", "127.0.0.1");
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(1000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 0);
}
finally {
rm.stop();
}
}
@Test
public void testNoNM() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
try {
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("nodeId", "127.0.0.1:1234");
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
Thread.sleep(1000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 0);
}
finally {
rm.stop();
}
}
@Test
public void testWrongNodeId() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "127.0.0.1",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "/default-rack",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
10)), null);
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("nodeId", "127.0.0.0");
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(1000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 0);
}
finally {
rm.stop();
}
}
@Test
public void testReserveNewContainer() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024,
rm.getResourceTrackerService());
MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024,
rm.getResourceTrackerService());
nm1.registerNode();
nm2.registerNode();
try {
RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
RMApp app2 = rm.submitApp(10, "app2", "user1", null, "b2");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096),
10)), null);
// Reserve new container
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("nodeId", "127.0.0.2");
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b3-b1");
JSONObject allocations = json.getJSONObject("allocations");
verifyStateOfAllocations(allocations, "finalAllocationState", "RESERVED");
// Do a node heartbeat again without releasing container from app2
r = resource();
params = new MultivaluedMapImpl();
params.add("nodeId", "127.0.0.2");
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
verifyQueueOrder(json.getJSONObject("allocations"), "b1");
allocations = json.getJSONObject("allocations");
verifyStateOfAllocations(allocations, "finalAllocationState", "SKIPPED");
// Finish application 2
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
ContainerId containerId = ContainerId.newContainerId(
am2.getApplicationAttemptId(), 1);
cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus
.newInstance(containerId, ContainerState.COMPLETE, "", 0),
RMContainerEventType.FINISHED);
// Do a node heartbeat again
r = resource();
params = new MultivaluedMapImpl();
params.add("nodeId", "127.0.0.2");
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
verifyQueueOrder(json.getJSONObject("allocations"), "b1");
allocations = json.getJSONObject("allocations");
verifyStateOfAllocations(allocations, "finalAllocationState",
"ALLOCATED_FROM_RESERVED");
}
finally {
rm.stop();
}
}
@Test
public void testActivityJSON() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("nodeId", "127.0.0.1");
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(1000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
JSONObject allocations = json.getJSONObject("allocations");
verifyStateOfAllocations(allocations, "finalAllocationState",
"ALLOCATED");
verifyNumberOfNodes(allocations, 5);
verifyQueueOrder(json.getJSONObject("allocations"), "root-b-b1");
}
finally {
rm.stop();
}
}
private void verifyNumberOfNodes(JSONObject allocation, int realValue)
throws Exception {
if (allocation.isNull("root")) {
assertEquals("State of allocation is wrong", 0, realValue);
} else {
assertEquals("State of allocation is wrong",
1 + getNumberOfNodes(allocation.getJSONObject("root")), realValue);
}
}
private int getNumberOfNodes(JSONObject allocation) throws Exception {
if (!allocation.isNull("children")) {
Object object = allocation.get("children");
if (object.getClass() == JSONObject.class) {
return 1 + getNumberOfNodes((JSONObject) object);
} else {
int count = 0;
for (int i = 0; i < ((JSONArray) object).length(); i++) {
count += (1 + getNumberOfNodes(
((JSONArray) object).getJSONObject(i)));
}
return count;
}
} else {
return 0;
}
}
private void verifyStateOfAllocations(JSONObject allocation,
String nameToCheck, String realState) throws Exception {
assertEquals("State of allocation is wrong", allocation.get(nameToCheck),
realState);
}
private void verifyNumberOfAllocations(JSONObject json, int realValue)
throws Exception {
if (json.isNull("allocations")) {
assertEquals("Number of allocations is wrong", 0, realValue);
} else {
Object object = json.get("allocations");
if (object.getClass() == JSONObject.class) {
assertEquals("Number of allocations is wrong", 1, realValue);
} else if (object.getClass() == JSONArray.class) {
assertEquals("Number of allocations is wrong",
((JSONArray) object).length(), realValue);
}
}
}
private void verifyQueueOrder(JSONObject json, String realOrder)
throws Exception {
String order = "";
if (!json.isNull("root")) {
JSONObject root = json.getJSONObject("root");
order = root.getString("name") + "-" + getQueueOrder(root);
}
assertEquals("Order of queue is wrong",
order.substring(0, order.length() - 1), realOrder);
}
private String getQueueOrder(JSONObject node) throws Exception {
if (!node.isNull("children")) {
Object children = node.get("children");
if (children.getClass() == JSONObject.class) {
if (!((JSONObject) children).isNull("appPriority")) {
return "";
}
return ((JSONObject) children).getString("name") + "-" + getQueueOrder(
(JSONObject) children);
} else if (children.getClass() == JSONArray.class) {
String order = "";
for (int i = 0; i < ((JSONArray) children).length(); i++) {
JSONObject child = (JSONObject) ((JSONArray) children).get(i);
if (!child.isNull("appPriority")) {
return "";
}
order += (child.getString("name") + "-" + getQueueOrder(child));
}
return order;
}
}
return "";
}
private void verifyNumberOfAllocationAttempts(JSONObject allocation,
int realValue) throws Exception {
if (allocation.isNull("allocationAttempt")) {
assertEquals("Number of allocation attempts is wrong", 0, realValue);
} else {
Object object = allocation.get("allocationAttempt");
if (object.getClass() == JSONObject.class) {
assertEquals("Number of allocations attempts is wrong", 1, realValue);
} else if (object.getClass() == JSONArray.class) {
assertEquals("Number of allocations attempts is wrong",
((JSONArray) object).length(), realValue);
}
}
}
@Test
public void testAppActivityJSON() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("appId", app1.getApplicationId().toString());
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(5000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
JSONObject allocations = json.getJSONObject("allocations");
verifyStateOfAllocations(allocations, "allocationState", "ACCEPTED");
verifyNumberOfAllocationAttempts(allocations, 1);
}
finally {
rm.stop();
}
}
@Test
public void testAppAssignMultipleContainersPerNodeHeartbeat()
throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "127.0.0.1",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "/default-rack",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
10)), null);
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("appId", app1.getApplicationId().toString());
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(5000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 10);
JSONArray allocations = json.getJSONArray("allocations");
for (int i = 0; i < allocations.length(); i++) {
verifyStateOfAllocations(allocations.getJSONObject(i),
"allocationState", "ACCEPTED");
}
}
finally {
rm.stop();
}
}
@Test
public void testAppAssignWithoutAvailableResource() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "127.0.0.1",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "/default-rack",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
10)), null);
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("appId", app1.getApplicationId().toString());
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(5000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 0);
}
finally {
rm.stop();
}
}
@Test
public void testAppNoNM() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
try {
RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("appId", app1.getApplicationId().toString());
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 0);
}
finally {
rm.stop();
}
}
@Test
public void testAppReserveNewContainer() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024,
rm.getResourceTrackerService());
MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024,
rm.getResourceTrackerService());
nm1.registerNode();
nm2.registerNode();
try {
RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
RMApp app2 = rm.submitApp(10, "app2", "user1", null, "b2");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096),
10)), null);
// Reserve new container
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("appId", app1.getApplicationId().toString());
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
// Do a node heartbeat again without releasing container from app2
r = resource();
params = new MultivaluedMapImpl();
params.add("appId", app1.getApplicationId().toString());
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 2);
// Finish application 2
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
ContainerId containerId = ContainerId.newContainerId(
am2.getApplicationAttemptId(), 1);
cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus
.newInstance(containerId, ContainerState.COMPLETE, "", 0),
RMContainerEventType.FINISHED);
// Do a node heartbeat again
r = resource();
params = new MultivaluedMapImpl();
params.add("appId", app1.getApplicationId().toString());
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 3);
}
finally {
rm.stop();
}
}
}