blob: f4241b530d64dc2b4eee287d5cf9d27ea1150585 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringReader;
import java.io.StringWriter;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CredentialsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LogAggregationContextInfo;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Singleton;
import com.google.inject.servlet.GuiceServletContextListener;
import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.LoggingFilter;
import com.sun.jersey.api.json.JSONConfiguration;
import com.sun.jersey.api.json.JSONJAXBContext;
import com.sun.jersey.api.json.JSONMarshaller;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.test.framework.WebAppDescriptor;
@RunWith(Parameterized.class)
public class TestRMWebServicesAppsModification extends JerseyTestBase {
private static MockRM rm;
private static final int CONTAINER_MB = 1024;
private static Injector injector;
private String webserviceUserName = "testuser";
private boolean setAuthFilter = false;
private static final String TEST_DIR = new File(System.getProperty(
"test.build.data", "/tmp")).getAbsolutePath();
private static final String FS_ALLOC_FILE = new File(TEST_DIR,
"test-fs-queues.xml").getAbsolutePath();
public static class GuiceServletConfig extends GuiceServletContextListener {
@Override
protected Injector getInjector() {
return injector;
}
}
/*
* Helper class to allow testing of RM web services which require
* authorization Add this class as a filter in the Guice injector for the
* MockRM
*/
@Singleton
public static class TestRMCustomAuthFilter extends AuthenticationFilter {
@Override
protected Properties getConfiguration(String configPrefix,
FilterConfig filterConfig) throws ServletException {
Properties props = new Properties();
Enumeration<?> names = filterConfig.getInitParameterNames();
while (names.hasMoreElements()) {
String name = (String) names.nextElement();
if (name.startsWith(configPrefix)) {
String value = filterConfig.getInitParameter(name);
props.put(name.substring(configPrefix.length()), value);
}
}
props.put(AuthenticationFilter.AUTH_TYPE, "simple");
props.put(PseudoAuthenticationHandler.ANONYMOUS_ALLOWED, "false");
return props;
}
}
private abstract class TestServletModule extends ServletModule {
public Configuration conf = new Configuration();
public abstract void configureScheduler();
@Override
protected void configureServlets() {
configureScheduler();
bind(JAXBContextResolver.class);
bind(RMWebServices.class);
bind(GenericExceptionHandler.class);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
rm = new MockRM(conf);
bind(ResourceManager.class).toInstance(rm);
if (setAuthFilter) {
filter("/*").through(TestRMCustomAuthFilter.class);
}
serve("/*").with(GuiceContainer.class);
}
}
private class CapTestServletModule extends TestServletModule {
@Override
public void configureScheduler() {
conf.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getName());
}
}
private class FairTestServletModule extends TestServletModule {
@Override
public void configureScheduler() {
try {
PrintWriter out = new PrintWriter(new FileWriter(FS_ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"root\">");
out.println(" <aclAdministerApps>someuser </aclAdministerApps>");
out.println(" <queue name=\"default\">");
out.println(" <aclAdministerApps>someuser </aclAdministerApps>");
out.println(" </queue>");
out.println(" <queue name=\"test\">");
out.println(" <aclAdministerApps>someuser </aclAdministerApps>");
out.println(" </queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
} catch(IOException e) {
}
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE);
conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
}
}
private Injector getNoAuthInjectorCap() {
return Guice.createInjector(new CapTestServletModule() {
@Override
protected void configureServlets() {
setAuthFilter = false;
super.configureServlets();
}
});
}
private Injector getSimpleAuthInjectorCap() {
return Guice.createInjector(new CapTestServletModule() {
@Override
protected void configureServlets() {
setAuthFilter = true;
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
// set the admin acls otherwise all users are considered admins
// and we can't test authorization
conf.setStrings(YarnConfiguration.YARN_ADMIN_ACL, "testuser1");
super.configureServlets();
}
});
}
private Injector getNoAuthInjectorFair() {
return Guice.createInjector(new FairTestServletModule() {
@Override
protected void configureServlets() {
setAuthFilter = false;
super.configureServlets();
}
});
}
private Injector getSimpleAuthInjectorFair() {
return Guice.createInjector(new FairTestServletModule() {
@Override
protected void configureServlets() {
setAuthFilter = true;
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
// set the admin acls otherwise all users are considered admins
// and we can't test authorization
conf.setStrings(YarnConfiguration.YARN_ADMIN_ACL, "testuser1");
super.configureServlets();
}
});
}
@Parameters
public static Collection<Object[]> guiceConfigs() {
return Arrays.asList(new Object[][] { { 0 }, { 1 }, { 2 }, { 3 } });
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
}
public TestRMWebServicesAppsModification(int run) {
super(new WebAppDescriptor.Builder(
"org.apache.hadoop.yarn.server.resourcemanager.webapp")
.contextListenerClass(GuiceServletConfig.class)
.filterClass(com.google.inject.servlet.GuiceFilter.class)
.clientConfig(new DefaultClientConfig(JAXBContextResolver.class))
.contextPath("jersey-guice-filter").servletPath("/").build());
switch (run) {
case 0:
default:
// No Auth Capacity Scheduler
injector = getNoAuthInjectorCap();
break;
case 1:
// Simple Auth Capacity Scheduler
injector = getSimpleAuthInjectorCap();
break;
case 2:
// No Auth Fair Scheduler
injector = getNoAuthInjectorFair();
break;
case 3:
// Simple Auth Fair Scheduler
injector = getSimpleAuthInjectorFair();
break;
}
}
private boolean isAuthenticationEnabled() {
return setAuthFilter;
}
private WebResource constructWebResource(WebResource r, String... paths) {
WebResource rt = r;
for (String path : paths) {
rt = rt.path(path);
}
if (isAuthenticationEnabled()) {
rt = rt.queryParam("user.name", webserviceUserName);
}
return rt;
}
private WebResource constructWebResource(String... paths) {
WebResource r = resource();
WebResource ws = r.path("ws").path("v1").path("cluster");
return this.constructWebResource(ws, paths);
}
@Test
public void testSingleAppState() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
for (String mediaType : mediaTypes) {
RMApp app = rm.submitApp(CONTAINER_MB, "", webserviceUserName);
amNodeManager.nodeHeartbeat(true);
ClientResponse response =
this
.constructWebResource("apps", app.getApplicationId().toString(),
"state").accept(mediaType).get(ClientResponse.class);
assertEquals(Status.OK, response.getClientResponseStatus());
if (mediaType.equals(MediaType.APPLICATION_JSON)) {
verifyAppStateJson(response, RMAppState.ACCEPTED);
} else if (mediaType.equals(MediaType.APPLICATION_XML)) {
verifyAppStateXML(response, RMAppState.ACCEPTED);
}
}
rm.stop();
}
@Test(timeout = 120000)
public void testSingleAppKill() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
MediaType[] contentTypes =
{ MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_XML_TYPE };
String diagnostic = "message1";
for (String mediaType : mediaTypes) {
for (MediaType contentType : contentTypes) {
RMApp app = rm.submitApp(CONTAINER_MB, "", webserviceUserName);
amNodeManager.nodeHeartbeat(true);
AppState targetState =
new AppState(YarnApplicationState.KILLED.toString());
targetState.setDiagnostics(diagnostic);
Object entity;
if (contentType.equals(MediaType.APPLICATION_JSON_TYPE)) {
entity = appStateToJSON(targetState);
} else {
entity = targetState;
}
ClientResponse response =
this
.constructWebResource("apps", app.getApplicationId().toString(),
"state").entity(entity, contentType).accept(mediaType)
.put(ClientResponse.class);
if (!isAuthenticationEnabled()) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
continue;
}
assertEquals(Status.ACCEPTED, response.getClientResponseStatus());
if (mediaType.equals(MediaType.APPLICATION_JSON)) {
verifyAppStateJson(response, RMAppState.FINAL_SAVING,
RMAppState.KILLED, RMAppState.KILLING, RMAppState.ACCEPTED);
} else {
verifyAppStateXML(response, RMAppState.FINAL_SAVING,
RMAppState.KILLED, RMAppState.KILLING, RMAppState.ACCEPTED);
}
String locationHeaderValue =
response.getHeaders().getFirst(HttpHeaders.LOCATION);
Client c = Client.create();
WebResource tmp = c.resource(locationHeaderValue);
if (isAuthenticationEnabled()) {
tmp = tmp.queryParam("user.name", webserviceUserName);
}
response = tmp.get(ClientResponse.class);
assertEquals(Status.OK, response.getClientResponseStatus());
assertTrue(locationHeaderValue.endsWith("/ws/v1/cluster/apps/"
+ app.getApplicationId().toString() + "/state"));
while (true) {
Thread.sleep(100);
response =
this
.constructWebResource("apps",
app.getApplicationId().toString(), "state").accept(mediaType)
.entity(entity, contentType).put(ClientResponse.class);
assertTrue((response.getClientResponseStatus() == Status.ACCEPTED)
|| (response.getClientResponseStatus() == Status.OK));
if (response.getClientResponseStatus() == Status.OK) {
assertEquals(RMAppState.KILLED, app.getState());
if (mediaType.equals(MediaType.APPLICATION_JSON)) {
verifyAppStateJson(response, RMAppState.KILLED);
} else {
verifyAppStateXML(response, RMAppState.KILLED);
}
assertTrue("Diagnostic message is incorrect",
app.getDiagnostics().toString().contains(diagnostic));
break;
}
}
}
}
rm.stop();
}
@Test
public void testSingleAppKillInvalidState() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
MediaType[] contentTypes =
{ MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_XML_TYPE };
String[] targetStates =
{ YarnApplicationState.FINISHED.toString(), "blah" };
for (String mediaType : mediaTypes) {
for (MediaType contentType : contentTypes) {
for (String targetStateString : targetStates) {
RMApp app = rm.submitApp(CONTAINER_MB, "", webserviceUserName);
amNodeManager.nodeHeartbeat(true);
ClientResponse response;
AppState targetState = new AppState(targetStateString);
Object entity;
if (contentType.equals(MediaType.APPLICATION_JSON_TYPE)) {
entity = appStateToJSON(targetState);
} else {
entity = targetState;
}
response =
this
.constructWebResource("apps",
app.getApplicationId().toString(), "state")
.entity(entity, contentType).accept(mediaType)
.put(ClientResponse.class);
if (!isAuthenticationEnabled()) {
assertEquals(Status.UNAUTHORIZED,
response.getClientResponseStatus());
continue;
}
assertEquals(Status.BAD_REQUEST, response.getClientResponseStatus());
}
}
}
rm.stop();
}
private static String appStateToJSON(AppState state) throws Exception {
StringWriter sw = new StringWriter();
JSONJAXBContext ctx = new JSONJAXBContext(AppState.class);
JSONMarshaller jm = ctx.createJSONMarshaller();
jm.marshallToJSON(state, sw);
return sw.toString();
}
protected static void verifyAppStateJson(ClientResponse response,
RMAppState... states) throws JSONException {
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
String responseState = json.getString("state");
boolean valid = false;
for (RMAppState state : states) {
if (state.toString().equals(responseState)) {
valid = true;
}
}
String msg = "app state incorrect, got " + responseState;
assertTrue(msg, valid);
}
protected static void verifyAppStateXML(ClientResponse response,
RMAppState... appStates) throws ParserConfigurationException,
IOException, SAXException {
assertEquals(MediaType.APPLICATION_XML_TYPE, response.getType());
String xml = response.getEntity(String.class);
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(xml));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("appstate");
assertEquals("incorrect number of elements", 1, nodes.getLength());
Element element = (Element) nodes.item(0);
String state = WebServicesTestUtils.getXmlString(element, "state");
boolean valid = false;
for (RMAppState appState : appStates) {
if (appState.toString().equals(state)) {
valid = true;
}
}
String msg = "app state incorrect, got " + state;
assertTrue(msg, valid);
}
@Test(timeout = 60000)
public void testSingleAppKillUnauthorized() throws Exception {
boolean isCapacityScheduler =
rm.getResourceScheduler() instanceof CapacityScheduler;
boolean isFairScheduler =
rm.getResourceScheduler() instanceof FairScheduler;
assumeTrue("This test is only supported on Capacity and Fair Scheduler",
isCapacityScheduler || isFairScheduler);
// FairScheduler use ALLOCATION_FILE to configure ACL
if (isCapacityScheduler) {
// default root queue allows anyone to have admin acl
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
csconf.setAcl("root", QueueACL.ADMINISTER_QUEUE, "someuser");
csconf.setAcl("root.default", QueueACL.ADMINISTER_QUEUE, "someuser");
rm.getResourceScheduler().reinitialize(csconf, rm.getRMContext());
}
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
for (String mediaType : mediaTypes) {
RMApp app = rm.submitApp(CONTAINER_MB, "test", "someuser");
amNodeManager.nodeHeartbeat(true);
ClientResponse response =
this
.constructWebResource("apps", app.getApplicationId().toString(),
"state").accept(mediaType).get(ClientResponse.class);
AppState info = response.getEntity(AppState.class);
info.setState(YarnApplicationState.KILLED.toString());
response =
this
.constructWebResource("apps", app.getApplicationId().toString(),
"state").accept(mediaType)
.entity(info, MediaType.APPLICATION_XML).put(ClientResponse.class);
validateResponseStatus(response, Status.FORBIDDEN);
}
rm.stop();
}
@Test
public void testSingleAppKillInvalidId() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
amNodeManager.nodeHeartbeat(true);
String[] testAppIds = { "application_1391705042196_0001", "random_string" };
for (int i = 0; i < testAppIds.length; i++) {
AppState info = new AppState("KILLED");
ClientResponse response =
this.constructWebResource("apps", testAppIds[i], "state")
.accept(MediaType.APPLICATION_XML)
.entity(info, MediaType.APPLICATION_XML).put(ClientResponse.class);
if (!isAuthenticationEnabled()) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
continue;
}
if (i == 0) {
assertEquals(Status.NOT_FOUND, response.getClientResponseStatus());
} else {
assertEquals(Status.BAD_REQUEST, response.getClientResponseStatus());
}
}
rm.stop();
}
@After
@Override
public void tearDown() throws Exception {
if (rm != null) {
rm.stop();
}
super.tearDown();
}
/**
* Helper function to wrap frequently used code. It checks the response status
* and checks if it UNAUTHORIZED if we are running with authorization turned
* off or the param passed if we are running with authorization turned on.
*
* @param response
* the ClientResponse object to be checked
* @param expectedAuthorizedMode
* the expected Status in authorized mode.
*/
public void validateResponseStatus(ClientResponse response,
Status expectedAuthorizedMode) {
validateResponseStatus(response, Status.UNAUTHORIZED,
expectedAuthorizedMode);
}
/**
* Helper function to wrap frequently used code. It checks the response status
* and checks if it is the param expectedUnauthorizedMode if we are running
* with authorization turned off or the param expectedAuthorizedMode passed if
* we are running with authorization turned on.
*
* @param response
* the ClientResponse object to be checked
* @param expectedUnauthorizedMode
* the expected Status in unauthorized mode.
* @param expectedAuthorizedMode
* the expected Status in authorized mode.
*/
public void validateResponseStatus(ClientResponse response,
Status expectedUnauthorizedMode, Status expectedAuthorizedMode) {
if (!isAuthenticationEnabled()) {
assertEquals(expectedUnauthorizedMode, response.getClientResponseStatus());
} else {
assertEquals(expectedAuthorizedMode, response.getClientResponseStatus());
}
}
// Simple test - just post to /apps/new-application and validate the response
@Test
public void testGetNewApplication() throws Exception {
client().addFilter(new LoggingFilter(System.out));
rm.start();
String mediaTypes[] =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
for (String acceptMedia : mediaTypes) {
testGetNewApplication(acceptMedia);
}
rm.stop();
}
protected String testGetNewApplication(String mediaType) throws JSONException,
ParserConfigurationException, IOException, SAXException {
ClientResponse response =
this.constructWebResource("apps", "new-application").accept(mediaType)
.post(ClientResponse.class);
validateResponseStatus(response, Status.OK);
if (!isAuthenticationEnabled()) {
return "";
}
return validateGetNewApplicationResponse(response);
}
protected String validateGetNewApplicationResponse(ClientResponse resp)
throws JSONException, ParserConfigurationException, IOException,
SAXException {
String ret = "";
if (resp.getType().equals(MediaType.APPLICATION_JSON_TYPE)) {
JSONObject json = resp.getEntity(JSONObject.class);
ret = validateGetNewApplicationJsonResponse(json);
} else if (resp.getType().equals(MediaType.APPLICATION_XML_TYPE)) {
String xml = resp.getEntity(String.class);
ret = validateGetNewApplicationXMLResponse(xml);
} else {
// we should not be here
assertTrue(false);
}
return ret;
}
protected String validateGetNewApplicationJsonResponse(JSONObject json)
throws JSONException {
String appId = json.getString("application-id");
assertTrue(!appId.isEmpty());
JSONObject maxResources = json.getJSONObject("maximum-resource-capability");
long memory = maxResources.getLong("memory");
long vCores = maxResources.getLong("vCores");
assertTrue(memory != 0);
assertTrue(vCores != 0);
return appId;
}
protected String validateGetNewApplicationXMLResponse(String response)
throws ParserConfigurationException, IOException, SAXException {
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(response));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("NewApplication");
assertEquals("incorrect number of elements", 1, nodes.getLength());
Element element = (Element) nodes.item(0);
String appId = WebServicesTestUtils.getXmlString(element, "application-id");
assertTrue(!appId.isEmpty());
NodeList maxResourceNodes =
element.getElementsByTagName("maximum-resource-capability");
assertEquals(1, maxResourceNodes.getLength());
Element maxResourceCapability = (Element) maxResourceNodes.item(0);
long memory =
WebServicesTestUtils.getXmlLong(maxResourceCapability, "memory");
long vCores =
WebServicesTestUtils.getXmlLong(maxResourceCapability, "vCores");
assertTrue(memory != 0);
assertTrue(vCores != 0);
return appId;
}
// Test to validate the process of submitting apps - test for appropriate
// errors as well
@Test
public void testGetNewApplicationAndSubmit() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
amNodeManager.nodeHeartbeat(true);
String mediaTypes[] =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
for (String acceptMedia : mediaTypes) {
for (String contentMedia : mediaTypes) {
testAppSubmit(acceptMedia, contentMedia);
testAppSubmitErrors(acceptMedia, contentMedia);
}
}
rm.stop();
}
public void testAppSubmit(String acceptMedia, String contentMedia)
throws Exception {
// create a test app and submit it via rest(after getting an app-id) then
// get the app details from the rmcontext and check that everything matches
client().addFilter(new LoggingFilter(System.out));
String lrKey = "example";
String queueName = "testqueue";
// create the queue
String[] queues = { "default", "testqueue" };
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
csconf.setQueues("root", queues);
csconf.setCapacity("root.default", 50.0f);
csconf.setCapacity("root.testqueue", 50.0f);
rm.getResourceScheduler().reinitialize(csconf, rm.getRMContext());
String appName = "test";
String appType = "test-type";
String urlPath = "apps";
String appId = testGetNewApplication(acceptMedia);
List<String> commands = new ArrayList<>();
commands.add("/bin/sleep 5");
HashMap<String, String> environment = new HashMap<>();
environment.put("APP_VAR", "ENV_SETTING");
HashMap<ApplicationAccessType, String> acls = new HashMap<>();
acls.put(ApplicationAccessType.MODIFY_APP, "testuser1, testuser2");
acls.put(ApplicationAccessType.VIEW_APP, "testuser3, testuser4");
Set<String> tags = new HashSet<>();
tags.add("tag1");
tags.add("tag 2");
CredentialsInfo credentials = new CredentialsInfo();
HashMap<String, String> tokens = new HashMap<>();
HashMap<String, String> secrets = new HashMap<>();
secrets.put("secret1", Base64.encodeBase64String(
"mysecret".getBytes("UTF8")));
credentials.setSecrets(secrets);
credentials.setTokens(tokens);
ApplicationSubmissionContextInfo appInfo = new ApplicationSubmissionContextInfo();
appInfo.setApplicationId(appId);
appInfo.setApplicationName(appName);
appInfo.setMaxAppAttempts(2);
appInfo.setQueue(queueName);
appInfo.setApplicationType(appType);
appInfo.setPriority(0);
HashMap<String, LocalResourceInfo> lr = new HashMap<>();
LocalResourceInfo y = new LocalResourceInfo();
y.setUrl(new URI("http://www.test.com/file.txt"));
y.setSize(100);
y.setTimestamp(System.currentTimeMillis());
y.setType(LocalResourceType.FILE);
y.setVisibility(LocalResourceVisibility.APPLICATION);
lr.put(lrKey, y);
appInfo.getContainerLaunchContextInfo().setResources(lr);
appInfo.getContainerLaunchContextInfo().setCommands(commands);
appInfo.getContainerLaunchContextInfo().setEnvironment(environment);
appInfo.getContainerLaunchContextInfo().setAcls(acls);
appInfo.getContainerLaunchContextInfo().getAuxillaryServiceData()
.put("test", Base64.encodeBase64URLSafeString("value12".getBytes("UTF8")));
appInfo.getContainerLaunchContextInfo().setCredentials(credentials);
appInfo.getResource().setMemory(1024);
appInfo.getResource().setvCores(1);
appInfo.setApplicationTags(tags);
// Set LogAggregationContextInfo
String includePattern = "file1";
String excludePattern = "file2";
String rolledLogsIncludePattern = "file3";
String rolledLogsExcludePattern = "file4";
String className = "policy_class";
String parameters = "policy_parameter";
LogAggregationContextInfo logAggregationContextInfo
= new LogAggregationContextInfo();
logAggregationContextInfo.setIncludePattern(includePattern);
logAggregationContextInfo.setExcludePattern(excludePattern);
logAggregationContextInfo.setRolledLogsIncludePattern(
rolledLogsIncludePattern);
logAggregationContextInfo.setRolledLogsExcludePattern(
rolledLogsExcludePattern);
logAggregationContextInfo.setLogAggregationPolicyClassName(className);
logAggregationContextInfo.setLogAggregationPolicyParameters(parameters);
appInfo.setLogAggregationContextInfo(logAggregationContextInfo);
// Set attemptFailuresValidityInterval
long attemptFailuresValidityInterval = 5000;
appInfo.setAttemptFailuresValidityInterval(
attemptFailuresValidityInterval);
// Set ReservationId
String reservationId = ReservationId.newInstance(
System.currentTimeMillis(), 1).toString();
appInfo.setReservationId(reservationId);
ClientResponse response =
this.constructWebResource(urlPath).accept(acceptMedia)
.entity(appInfo, contentMedia).post(ClientResponse.class);
if (!this.isAuthenticationEnabled()) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
return;
}
assertEquals(Status.ACCEPTED, response.getClientResponseStatus());
assertTrue(!response.getHeaders().getFirst(HttpHeaders.LOCATION).isEmpty());
String locURL = response.getHeaders().getFirst(HttpHeaders.LOCATION);
assertTrue(locURL.contains("/apps/application"));
appId = locURL.substring(locURL.indexOf("/apps/") + "/apps/".length());
WebResource res = resource().uri(new URI(locURL));
res = res.queryParam("user.name", webserviceUserName);
response = res.get(ClientResponse.class);
assertEquals(Status.OK, response.getClientResponseStatus());
RMApp app =
rm.getRMContext().getRMApps()
.get(ApplicationId.fromString(appId));
assertEquals(appName, app.getName());
assertEquals(webserviceUserName, app.getUser());
assertEquals(2, app.getMaxAppAttempts());
if (app.getQueue().contains("root.")) {
queueName = "root." + queueName;
}
assertEquals(queueName, app.getQueue());
assertEquals(appType, app.getApplicationType());
assertEquals(tags, app.getApplicationTags());
ContainerLaunchContext ctx =
app.getApplicationSubmissionContext().getAMContainerSpec();
assertEquals(commands, ctx.getCommands());
assertEquals(environment, ctx.getEnvironment());
assertEquals(acls, ctx.getApplicationACLs());
Map<String, LocalResource> appLRs = ctx.getLocalResources();
assertTrue(appLRs.containsKey(lrKey));
LocalResource exampleLR = appLRs.get(lrKey);
assertEquals(URL.fromURI(y.getUrl()), exampleLR.getResource());
assertEquals(y.getSize(), exampleLR.getSize());
assertEquals(y.getTimestamp(), exampleLR.getTimestamp());
assertEquals(y.getType(), exampleLR.getType());
assertEquals(y.getPattern(), exampleLR.getPattern());
assertEquals(y.getVisibility(), exampleLR.getVisibility());
Credentials cs = new Credentials();
ByteArrayInputStream str =
new ByteArrayInputStream(app.getApplicationSubmissionContext()
.getAMContainerSpec().getTokens().array());
DataInputStream di = new DataInputStream(str);
cs.readTokenStorageStream(di);
Text key = new Text("secret1");
assertTrue("Secrets missing from credentials object", cs
.getAllSecretKeys().contains(key));
assertEquals("mysecret", new String(cs.getSecretKey(key), "UTF-8"));
// Check LogAggregationContext
ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
LogAggregationContext lac = asc.getLogAggregationContext();
assertEquals(includePattern, lac.getIncludePattern());
assertEquals(excludePattern, lac.getExcludePattern());
assertEquals(rolledLogsIncludePattern, lac.getRolledLogsIncludePattern());
assertEquals(rolledLogsExcludePattern, lac.getRolledLogsExcludePattern());
assertEquals(className, lac.getLogAggregationPolicyClassName());
assertEquals(parameters, lac.getLogAggregationPolicyParameters());
// Check attemptFailuresValidityInterval
assertEquals(attemptFailuresValidityInterval,
asc.getAttemptFailuresValidityInterval());
// Check ReservationId
assertEquals(reservationId, app.getReservationId().toString());
response =
this.constructWebResource("apps", appId).accept(acceptMedia)
.get(ClientResponse.class);
assertEquals(Status.OK, response.getClientResponseStatus());
}
public void testAppSubmitErrors(String acceptMedia, String contentMedia)
throws Exception {
// submit a bunch of bad requests(correct format but bad values) via the
// REST API and make sure we get the right error response codes
String urlPath = "apps";
ApplicationSubmissionContextInfo appInfo = new ApplicationSubmissionContextInfo();
ClientResponse response =
this.constructWebResource(urlPath).accept(acceptMedia)
.entity(appInfo, contentMedia).post(ClientResponse.class);
validateResponseStatus(response, Status.BAD_REQUEST);
String appId = "random";
appInfo.setApplicationId(appId);
response =
this.constructWebResource(urlPath).accept(acceptMedia)
.entity(appInfo, contentMedia).post(ClientResponse.class);
validateResponseStatus(response, Status.BAD_REQUEST);
appId = "random_junk";
appInfo.setApplicationId(appId);
response =
this.constructWebResource(urlPath).accept(acceptMedia)
.entity(appInfo, contentMedia).post(ClientResponse.class);
validateResponseStatus(response, Status.BAD_REQUEST);
// bad resource info
appInfo.getResource().setMemory(
rm.getConfig().getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB) + 1);
appInfo.getResource().setvCores(1);
response =
this.constructWebResource(urlPath).accept(acceptMedia)
.entity(appInfo, contentMedia).post(ClientResponse.class);
validateResponseStatus(response, Status.BAD_REQUEST);
appInfo.getResource().setvCores(
rm.getConfig().getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES) + 1);
appInfo.getResource().setMemory(CONTAINER_MB);
response =
this.constructWebResource(urlPath).accept(acceptMedia)
.entity(appInfo, contentMedia).post(ClientResponse.class);
validateResponseStatus(response, Status.BAD_REQUEST);
}
@Test
public void testAppSubmitBadJsonAndXML() throws Exception {
// submit a bunch of bad XML and JSON via the
// REST API and make sure we get error response codes
String urlPath = "apps";
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
amNodeManager.nodeHeartbeat(true);
ApplicationSubmissionContextInfo appInfo = new ApplicationSubmissionContextInfo();
appInfo.setApplicationName("test");
appInfo.setPriority(3);
appInfo.setMaxAppAttempts(2);
appInfo.setQueue("testqueue");
appInfo.setApplicationType("test-type");
HashMap<String, LocalResourceInfo> lr = new HashMap<>();
LocalResourceInfo y = new LocalResourceInfo();
y.setUrl(new URI("http://www.test.com/file.txt"));
y.setSize(100);
y.setTimestamp(System.currentTimeMillis());
y.setType(LocalResourceType.FILE);
y.setVisibility(LocalResourceVisibility.APPLICATION);
lr.put("example", y);
appInfo.getContainerLaunchContextInfo().setResources(lr);
appInfo.getResource().setMemory(1024);
appInfo.getResource().setvCores(1);
String body =
"<?xml version=\"1.0\" encoding=\"UTF-8\" "
+ "standalone=\"yes\"?><blah/>";
ClientResponse response =
this.constructWebResource(urlPath).accept(MediaType.APPLICATION_XML)
.entity(body, MediaType.APPLICATION_XML).post(ClientResponse.class);
assertEquals(Status.BAD_REQUEST, response.getClientResponseStatus());
body = "{\"a\" : \"b\"}";
response =
this.constructWebResource(urlPath).accept(MediaType.APPLICATION_XML)
.entity(body, MediaType.APPLICATION_JSON).post(ClientResponse.class);
validateResponseStatus(response, Status.BAD_REQUEST);
rm.stop();
}
@Test
public void testGetAppQueue() throws Exception {
client().addFilter(new LoggingFilter(System.out));
boolean isCapacityScheduler =
rm.getResourceScheduler() instanceof CapacityScheduler;
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] contentTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
for (String contentType : contentTypes) {
RMApp app = rm.submitApp(CONTAINER_MB, "", webserviceUserName);
amNodeManager.nodeHeartbeat(true);
ClientResponse response =
this
.constructWebResource("apps", app.getApplicationId().toString(),
"queue").accept(contentType).get(ClientResponse.class);
assertEquals(Status.OK, response.getClientResponseStatus());
String expectedQueue = "default";
if(!isCapacityScheduler) {
expectedQueue = "root." + webserviceUserName;
}
if (contentType.equals(MediaType.APPLICATION_JSON)) {
verifyAppQueueJson(response, expectedQueue);
} else {
verifyAppQueueXML(response, expectedQueue);
}
}
rm.stop();
}
@Test(timeout = 90000)
public void testUpdateAppPriority() throws Exception {
client().addFilter(new LoggingFilter(System.out));
if (!(rm.getResourceScheduler() instanceof CapacityScheduler)) {
// till the fair scheduler modifications for priority is completed
return;
}
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
cs.setClusterMaxPriority(conf);
// default root queue allows anyone to have admin acl
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
String[] queues = { "default", "test" };
csconf.setQueues("root", queues);
csconf.setCapacity("root.default", 50.0f);
csconf.setCapacity("root.test", 50.0f);
csconf.setAcl("root", QueueACL.ADMINISTER_QUEUE, "someuser");
csconf.setAcl("root.default", QueueACL.ADMINISTER_QUEUE, "someuser");
csconf.setAcl("root.test", QueueACL.ADMINISTER_QUEUE, "someuser");
rm.getResourceScheduler().reinitialize(csconf, rm.getRMContext());
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
MediaType[] contentTypes =
{ MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_XML_TYPE };
for (String mediaType : mediaTypes) {
for (MediaType contentType : contentTypes) {
RMApp app = rm.submitApp(CONTAINER_MB, "", webserviceUserName);
amNodeManager.nodeHeartbeat(true);
int modifiedPriority = 8;
AppPriority priority = new AppPriority(modifiedPriority);
Object entity;
if (contentType.equals(MediaType.APPLICATION_JSON_TYPE)) {
entity = appPriorityToJSON(priority);
} else {
entity = priority;
}
ClientResponse response = this
.constructWebResource("apps", app.getApplicationId().toString(),
"priority")
.entity(entity, contentType).accept(mediaType)
.put(ClientResponse.class);
if (!isAuthenticationEnabled()) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
continue;
}
assertEquals(Status.OK, response.getClientResponseStatus());
if (mediaType.equals(MediaType.APPLICATION_JSON)) {
verifyAppPriorityJson(response, modifiedPriority);
} else {
verifyAppPriorityXML(response, modifiedPriority);
}
response = this
.constructWebResource("apps", app.getApplicationId().toString(),
"priority")
.accept(mediaType).get(ClientResponse.class);
assertEquals(Status.OK, response.getClientResponseStatus());
if (mediaType.equals(MediaType.APPLICATION_JSON)) {
verifyAppPriorityJson(response, modifiedPriority);
} else {
verifyAppPriorityXML(response, modifiedPriority);
}
// check unauthorized
app = rm.submitApp(CONTAINER_MB, "", "someuser");
amNodeManager.nodeHeartbeat(true);
response = this
.constructWebResource("apps", app.getApplicationId().toString(),
"priority")
.entity(entity, contentType).accept(mediaType)
.put(ClientResponse.class);
assertEquals(Status.FORBIDDEN, response.getClientResponseStatus());
}
}
rm.stop();
}
@Test(timeout = 90000)
public void testAppMove() throws Exception {
client().addFilter(new LoggingFilter(System.out));
boolean isCapacityScheduler =
rm.getResourceScheduler() instanceof CapacityScheduler;
// default root queue allows anyone to have admin acl
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
String[] queues = { "default", "test" };
csconf.setQueues("root", queues);
csconf.setCapacity("root.default", 50.0f);
csconf.setCapacity("root.test", 50.0f);
csconf.setAcl("root", QueueACL.ADMINISTER_QUEUE, "someuser");
csconf.setAcl("root.default", QueueACL.ADMINISTER_QUEUE, "someuser");
csconf.setAcl("root.test", QueueACL.ADMINISTER_QUEUE, "someuser");
rm.getResourceScheduler().reinitialize(csconf, rm.getRMContext());
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
MediaType[] contentTypes =
{ MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_XML_TYPE };
for (String mediaType : mediaTypes) {
for (MediaType contentType : contentTypes) {
RMApp app = rm.submitApp(CONTAINER_MB, "", webserviceUserName);
amNodeManager.nodeHeartbeat(true);
AppQueue targetQueue = new AppQueue("test");
Object entity;
if (contentType.equals(MediaType.APPLICATION_JSON_TYPE)) {
entity = appQueueToJSON(targetQueue);
} else {
entity = targetQueue;
}
ClientResponse response =
this
.constructWebResource("apps", app.getApplicationId().toString(),
"queue").entity(entity, contentType).accept(mediaType)
.put(ClientResponse.class);
if (!isAuthenticationEnabled()) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
continue;
}
assertEquals(Status.OK, response.getClientResponseStatus());
String expectedQueue = "test";
if(!isCapacityScheduler) {
expectedQueue = "root.test";
}
if (mediaType.equals(MediaType.APPLICATION_JSON)) {
verifyAppQueueJson(response, expectedQueue);
} else {
verifyAppQueueXML(response, expectedQueue);
}
Assert.assertEquals(expectedQueue, app.getQueue());
// check unauthorized
app = rm.submitApp(CONTAINER_MB, "", "someuser");
amNodeManager.nodeHeartbeat(true);
response =
this
.constructWebResource("apps", app.getApplicationId().toString(),
"queue").entity(entity, contentType).accept(mediaType)
.put(ClientResponse.class);
assertEquals(Status.FORBIDDEN, response.getClientResponseStatus());
if(isCapacityScheduler) {
Assert.assertEquals("default", app.getQueue());
}
else {
Assert.assertEquals("root.someuser", app.getQueue());
}
}
}
rm.stop();
}
protected static String appPriorityToJSON(AppPriority targetPriority)
throws Exception {
StringWriter sw = new StringWriter();
JSONJAXBContext ctx = new JSONJAXBContext(AppPriority.class);
JSONMarshaller jm = ctx.createJSONMarshaller();
jm.marshallToJSON(targetPriority, sw);
return sw.toString();
}
protected static String appQueueToJSON(AppQueue targetQueue) throws Exception {
StringWriter sw = new StringWriter();
JSONJAXBContext ctx = new JSONJAXBContext(AppQueue.class);
JSONMarshaller jm = ctx.createJSONMarshaller();
jm.marshallToJSON(targetQueue, sw);
return sw.toString();
}
protected static void verifyAppPriorityJson(ClientResponse response,
int expectedPriority) throws JSONException {
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
int responsePriority = json.getInt("priority");
assertEquals(expectedPriority, responsePriority);
}
protected static void verifyAppPriorityXML(ClientResponse response,
int expectedPriority)
throws ParserConfigurationException, IOException, SAXException {
assertEquals(MediaType.APPLICATION_XML_TYPE, response.getType());
String xml = response.getEntity(String.class);
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(xml));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("applicationpriority");
assertEquals("incorrect number of elements", 1, nodes.getLength());
Element element = (Element) nodes.item(0);
int responsePriority = WebServicesTestUtils.getXmlInt(element, "priority");
assertEquals(expectedPriority, responsePriority);
}
protected static void
verifyAppQueueJson(ClientResponse response, String queue)
throws JSONException {
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
String responseQueue = json.getString("queue");
assertEquals(queue, responseQueue);
}
protected static void
verifyAppQueueXML(ClientResponse response, String queue)
throws ParserConfigurationException, IOException, SAXException {
assertEquals(MediaType.APPLICATION_XML_TYPE, response.getType());
String xml = response.getEntity(String.class);
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(xml));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("appqueue");
assertEquals("incorrect number of elements", 1, nodes.getLength());
Element element = (Element) nodes.item(0);
String responseQueue = WebServicesTestUtils.getXmlString(element, "queue");
assertEquals(queue, responseQueue);
}
@Test(timeout = 90000)
public void testUpdateAppTimeout() throws Exception {
client().addFilter(new LoggingFilter(System.out));
rm.start();
rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
MediaType[] contentTypes =
{ MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_XML_TYPE };
for (String mediaType : mediaTypes) {
for (MediaType contentType : contentTypes) {
// application submitted without timeout
RMApp app = rm.submitApp(CONTAINER_MB, "", webserviceUserName);
ClientResponse response =
this.constructWebResource("apps", app.getApplicationId().toString(),
"timeouts").accept(mediaType).get(ClientResponse.class);
if (mediaType.contains(MediaType.APPLICATION_JSON)) {
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject js =
response.getEntity(JSONObject.class).getJSONObject("timeouts");
JSONArray entity = js.getJSONArray("timeout");
verifyAppTimeoutJson(entity.getJSONObject(0),
ApplicationTimeoutType.LIFETIME, "UNLIMITED", -1);
}
long timeOutFromNow = 60;
String expireTime = Times
.formatISO8601(System.currentTimeMillis() + timeOutFromNow * 1000);
Object entity = getAppTimeoutInfoEntity(ApplicationTimeoutType.LIFETIME,
contentType, expireTime);
response = this
.constructWebResource("apps", app.getApplicationId().toString(),
"timeout")
.entity(entity, contentType).accept(mediaType)
.put(ClientResponse.class);
if (!isAuthenticationEnabled()) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
continue;
}
assertEquals(Status.OK, response.getClientResponseStatus());
if (mediaType.contains(MediaType.APPLICATION_JSON)) {
verifyAppTimeoutJson(response, ApplicationTimeoutType.LIFETIME,
expireTime, timeOutFromNow);
} else {
verifyAppTimeoutXML(response, ApplicationTimeoutType.LIFETIME,
expireTime, timeOutFromNow);
}
// verify for negative cases
entity = getAppTimeoutInfoEntity(null,
contentType, null);
response = this
.constructWebResource("apps", app.getApplicationId().toString(),
"timeout")
.entity(entity, contentType).accept(mediaType)
.put(ClientResponse.class);
assertEquals(Status.BAD_REQUEST,
response.getClientResponseStatus());
// invoke get
response =
this.constructWebResource("apps", app.getApplicationId().toString(),
"timeouts", ApplicationTimeoutType.LIFETIME.toString())
.accept(mediaType).get(ClientResponse.class);
assertEquals(Status.OK, response.getClientResponseStatus());
if (mediaType.contains(MediaType.APPLICATION_JSON)) {
verifyAppTimeoutJson(response, ApplicationTimeoutType.LIFETIME,
expireTime, timeOutFromNow);
}
}
}
rm.stop();
}
private Object getAppTimeoutInfoEntity(ApplicationTimeoutType type,
MediaType contentType, String expireTime) throws Exception {
AppTimeoutInfo timeoutUpdate = new AppTimeoutInfo();
timeoutUpdate.setTimeoutType(type);
timeoutUpdate.setExpiryTime(expireTime);
Object entity;
if (contentType.equals(MediaType.APPLICATION_JSON_TYPE)) {
entity = appTimeoutToJSON(timeoutUpdate);
} else {
entity = timeoutUpdate;
}
return entity;
}
protected static void verifyAppTimeoutJson(ClientResponse response,
ApplicationTimeoutType type, String expireTime, long timeOutFromNow)
throws JSONException {
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject jsonTimeout = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, jsonTimeout.length());
JSONObject json = jsonTimeout.getJSONObject("timeout");
verifyAppTimeoutJson(json, type, expireTime, timeOutFromNow);
}
protected static void verifyAppTimeoutJson(JSONObject json,
ApplicationTimeoutType type, String expireTime, long timeOutFromNow)
throws JSONException {
assertEquals("incorrect number of elements", 3, json.length());
assertEquals(type.toString(), json.getString("type"));
assertEquals(expireTime, json.getString("expiryTime"));
assertTrue(json.getLong("remainingTimeInSeconds") <= timeOutFromNow);
}
protected static void verifyAppTimeoutXML(ClientResponse response,
ApplicationTimeoutType type, String expireTime, long timeOutFromNow)
throws ParserConfigurationException, IOException, SAXException {
assertEquals(MediaType.APPLICATION_XML_TYPE, response.getType());
String xml = response.getEntity(String.class);
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(xml));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("timeout");
assertEquals("incorrect number of elements", 1, nodes.getLength());
Element element = (Element) nodes.item(0);
assertEquals(type.toString(),
WebServicesTestUtils.getXmlString(element, "type"));
assertEquals(expireTime,
WebServicesTestUtils.getXmlString(element, "expiryTime"));
assertTrue(WebServicesTestUtils.getXmlLong(element,
"remainingTimeInSeconds") < timeOutFromNow);
}
protected static String appTimeoutToJSON(AppTimeoutInfo timeout)
throws Exception {
StringWriter sw = new StringWriter();
JSONJAXBContext ctx = new JSONJAXBContext(
JSONConfiguration.natural().rootUnwrapping(false).build(),
AppTimeoutInfo.class);
JSONMarshaller jm = ctx.createJSONMarshaller();
jm.marshallToJSON(timeout, sw);
jm.marshallToJSON(timeout, System.out);
return sw.toString();
}
}