blob: fbcf9e9eb374e9af680c8abb20a778dd8258511e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import com.google.inject.Guice;
import com.google.inject.Singleton;
import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.test.framework.WebAppDescriptor;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryClientService;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManagerOnTimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.TestApplicationHistoryManagerOnTimelineStore;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.apache.hadoop.yarn.server.webapp.LogServlet;
import org.apache.hadoop.yarn.server.webapp.LogWebServiceUtils;
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import static org.apache.hadoop.yarn.webapp.WebServicesTestUtils.assertResponseStatusCode;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
public class TestAHSWebServices extends JerseyTestBase {
private static ApplicationHistoryClientService historyClientService;
private static AHSWebServices ahsWebservice;
private static final String[] USERS = new String[]{"foo", "bar"};
private static final int MAX_APPS = 6;
private static Configuration conf;
private static FileSystem fs;
private static final String remoteLogRootDir = "target/logs/";
private static final String rootLogDir = "target/LocalLogs";
private static final String NM_WEBADDRESS = "test-nm-web-address:9999";
private static final String NM_ID = "test:1234";
@BeforeAll
public static void setupClass() throws Exception {
conf = new YarnConfiguration();
TimelineStore store =
TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS);
TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
aclsManager.setTimelineStore(store);
TimelineDataManager dataManager =
new TimelineDataManager(store, aclsManager);
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "foo");
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
dataManager.init(conf);
ApplicationACLsManager appAclsManager = new ApplicationACLsManager(conf);
ApplicationHistoryManagerOnTimelineStore historyManager =
new ApplicationHistoryManagerOnTimelineStore(dataManager, appAclsManager);
historyManager.init(conf);
historyClientService = new ApplicationHistoryClientService(historyManager) {
@Override
protected void serviceStart() throws Exception {
// Do Nothing
}
};
historyClientService.init(conf);
historyClientService.start();
ahsWebservice = new AHSWebServices(historyClientService, conf);
LogServlet logServlet = spy(ahsWebservice.getLogServlet());
doReturn(null).when(logServlet).getNMWebAddressFromRM(any());
doReturn(NM_WEBADDRESS).when(logServlet).getNMWebAddressFromRM(NM_ID);
ahsWebservice.setLogServlet(logServlet);
fs = FileSystem.get(conf);
GuiceServletConfig.setInjector(
Guice.createInjector(new WebServletModule()));
}
@AfterAll
public static void tearDownClass() throws Exception {
if (historyClientService != null) {
historyClientService.stop();
}
fs.delete(new Path(remoteLogRootDir), true);
fs.delete(new Path(rootLogDir), true);
}
public static Collection<Object[]> rounds() {
return Arrays.asList(new Object[][]{{0}, {1}});
}
private static class WebServletModule extends ServletModule {
@Override
protected void configureServlets() {
bind(JAXBContextResolver.class);
bind(AHSWebServices.class).toInstance(ahsWebservice);
bind(GenericExceptionHandler.class);
bind(ApplicationBaseProtocol.class).toInstance(historyClientService);
serve("/*").with(GuiceContainer.class);
filter("/*").through(TestSimpleAuthFilter.class);
}
}
@BeforeEach
public void setUp() throws Exception {
super.setUp();
GuiceServletConfig.setInjector(
Guice.createInjector(new WebServletModule()));
}
@Singleton
public static class TestSimpleAuthFilter extends AuthenticationFilter {
@Override
protected Properties getConfiguration(String configPrefix,
FilterConfig filterConfig) throws ServletException {
Properties properties =
super.getConfiguration(configPrefix, filterConfig);
properties.put(AuthenticationFilter.AUTH_TYPE, "simple");
properties.put(PseudoAuthenticationHandler.ANONYMOUS_ALLOWED, "false");
return properties;
}
}
public TestAHSWebServices() {
super(new WebAppDescriptor.Builder(
"org.apache.hadoop.yarn.server.applicationhistoryservice.webapp")
.contextListenerClass(GuiceServletConfig.class)
.filterClass(com.google.inject.servlet.GuiceFilter.class)
.contextPath("jersey-guice-filter").servletPath("/").build());
}
@MethodSource("rounds")
@ParameterizedTest
void testInvalidApp(int round) {
ApplicationId appId = ApplicationId.newInstance(0, MAX_APPS + 1);
WebResource r = resource();
ClientResponse response =
r.path("ws").path("v1").path("applicationhistory").path("apps")
.path(appId.toString())
.queryParam("user.name", USERS[round])
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertResponseStatusCode("404 not found expected",
Status.NOT_FOUND, response.getStatusInfo());
}
@MethodSource("rounds")
@ParameterizedTest
void testInvalidAttempt(int round) {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, MAX_APPS + 1);
WebResource r = resource();
ClientResponse response =
r.path("ws").path("v1").path("applicationhistory").path("apps")
.path(appId.toString()).path("appattempts")
.path(appAttemptId.toString())
.queryParam("user.name", USERS[round])
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
if (round == 1) {
assertResponseStatusCode(Status.FORBIDDEN, response.getStatusInfo());
return;
}
assertResponseStatusCode("404 not found expected",
Status.NOT_FOUND, response.getStatusInfo());
}
@MethodSource("rounds")
@ParameterizedTest
void testInvalidContainer(int round) throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId.newContainerId(appAttemptId,
MAX_APPS + 1);
WebResource r = resource();
ClientResponse response =
r.path("ws").path("v1").path("applicationhistory").path("apps")
.path(appId.toString()).path("appattempts")
.path(appAttemptId.toString()).path("containers")
.path(containerId.toString())
.queryParam("user.name", USERS[round])
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
if (round == 1) {
assertResponseStatusCode(Status.FORBIDDEN, response.getStatusInfo());
return;
}
assertResponseStatusCode("404 not found expected",
Status.NOT_FOUND, response.getStatusInfo());
}
@MethodSource("rounds")
@ParameterizedTest
void testInvalidUri(int round) throws JSONException, Exception {
WebResource r = resource();
String responseStr = "";
try {
responseStr =
r.path("ws").path("v1").path("applicationhistory").path("bogus")
.queryParam("user.name", USERS[round])
.accept(MediaType.APPLICATION_JSON).get(String.class);
fail("should have thrown exception on invalid uri");
} catch (UniformInterfaceException ue) {
ClientResponse response = ue.getResponse();
assertResponseStatusCode(Status.NOT_FOUND, response.getStatusInfo());
WebServicesTestUtils.checkStringMatch(
"error string exists and shouldn't", "", responseStr);
}
}
@MethodSource("rounds")
@ParameterizedTest
void testInvalidUri2(int round) throws JSONException, Exception {
WebResource r = resource();
String responseStr = "";
try {
responseStr = r.queryParam("user.name", USERS[round])
.accept(MediaType.APPLICATION_JSON).get(String.class);
fail("should have thrown exception on invalid uri");
} catch (UniformInterfaceException ue) {
ClientResponse response = ue.getResponse();
assertResponseStatusCode(Status.NOT_FOUND, response.getStatusInfo());
WebServicesTestUtils.checkStringMatch(
"error string exists and shouldn't", "", responseStr);
}
}
@MethodSource("rounds")
@ParameterizedTest
void testInvalidAccept(int round) throws JSONException, Exception {
WebResource r = resource();
String responseStr = "";
try {
responseStr =
r.path("ws").path("v1").path("applicationhistory")
.queryParam("user.name", USERS[round])
.accept(MediaType.TEXT_PLAIN).get(String.class);
fail("should have thrown exception on invalid uri");
} catch (UniformInterfaceException ue) {
ClientResponse response = ue.getResponse();
assertResponseStatusCode(Status.INTERNAL_SERVER_ERROR,
response.getStatusInfo());
WebServicesTestUtils.checkStringMatch(
"error string exists and shouldn't", "", responseStr);
}
}
@MethodSource("rounds")
@ParameterizedTest
void testAbout(int round) throws Exception {
WebResource r = resource();
ClientResponse response = r
.path("ws").path("v1").path("applicationhistory").path("about")
.queryParam("user.name", USERS[round])
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
response.getType().toString());
TimelineAbout actualAbout = response.getEntity(TimelineAbout.class);
TimelineAbout expectedAbout =
TimelineUtils.createTimelineAbout("Generic History Service API");
assertNotNull(
actualAbout, "Timeline service about response is null");
assertEquals(expectedAbout.getAbout(), actualAbout.getAbout());
assertEquals(expectedAbout.getTimelineServiceVersion(),
actualAbout.getTimelineServiceVersion());
assertEquals(expectedAbout.getTimelineServiceBuildVersion(),
actualAbout.getTimelineServiceBuildVersion());
assertEquals(expectedAbout.getTimelineServiceVersionBuiltOn(),
actualAbout.getTimelineServiceVersionBuiltOn());
assertEquals(expectedAbout.getHadoopVersion(),
actualAbout.getHadoopVersion());
assertEquals(expectedAbout.getHadoopBuildVersion(),
actualAbout.getHadoopBuildVersion());
assertEquals(expectedAbout.getHadoopVersionBuiltOn(),
actualAbout.getHadoopVersionBuiltOn());
}
@MethodSource("rounds")
@ParameterizedTest
void testAppsQuery(int round) throws Exception {
WebResource r = resource();
ClientResponse response =
r.path("ws").path("v1").path("applicationhistory").path("apps")
.queryParam("state", YarnApplicationState.FINISHED.toString())
.queryParam("user.name", USERS[round])
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals(1, json.length(), "incorrect number of elements");
JSONObject apps = json.getJSONObject("apps");
assertEquals(1, apps.length(), "incorrect number of elements");
JSONArray array = apps.getJSONArray("app");
assertEquals(MAX_APPS, array.length(), "incorrect number of elements");
}
@MethodSource("rounds")
@ParameterizedTest
void testQueueQuery(int round) throws Exception {
WebResource r = resource();
ClientResponse response =
r.path("ws").path("v1").path("applicationhistory").path("apps")
.queryParam("queue", "test queue")
.queryParam("user.name", USERS[round])
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertResponseStatusCode(Status.OK, response.getStatusInfo());
assertEquals(MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals(1, json.length(), "incorrect number of elements");
JSONObject apps = json.getJSONObject("apps");
assertEquals(1, apps.length(), "incorrect number of elements");
JSONArray array = apps.getJSONArray("app");
assertEquals(MAX_APPS - 1,
array.length(),
"incorrect number of elements");
}
@MethodSource("rounds")
@ParameterizedTest
void testSingleApp(int round) throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
WebResource r = resource();
ClientResponse response =
r.path("ws").path("v1").path("applicationhistory").path("apps")
.path(appId.toString())
.queryParam("user.name", USERS[round])
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals(1, json.length(), "incorrect number of elements");
JSONObject app = json.getJSONObject("app");
assertEquals(appId.toString(), app.getString("appId"));
assertEquals("test app", app.get("name"));
assertEquals(round == 0 ? "test diagnostics info" : "",
app.get("diagnosticsInfo"));
assertEquals(Integer.MAX_VALUE + 1L, app.get("submittedTime"));
assertEquals("test queue", app.get("queue"));
assertEquals("user1", app.get("user"));
assertEquals("test app type", app.get("type"));
assertEquals(FinalApplicationStatus.UNDEFINED.toString(),
app.get("finalAppStatus"));
assertEquals(YarnApplicationState.FINISHED.toString(), app.get("appState"));
assertNotNull(app.get("aggregateResourceAllocation"),
"Aggregate resource allocation is null");
assertNotNull(app.get("aggregatePreemptedResourceAllocation"),
"Aggregate Preempted Resource Allocation is null");
}
@MethodSource("rounds")
@ParameterizedTest
void testMultipleAttempts(int round) throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
WebResource r = resource();
ClientResponse response =
r.path("ws").path("v1").path("applicationhistory").path("apps")
.path(appId.toString()).path("appattempts")
.queryParam("user.name", USERS[round])
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
if (round == 1) {
assertResponseStatusCode(Status.FORBIDDEN, response.getStatusInfo());
return;
}
assertEquals(MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals(1, json.length(), "incorrect number of elements");
JSONObject appAttempts = json.getJSONObject("appAttempts");
assertEquals(1, appAttempts.length(), "incorrect number of elements");
JSONArray array = appAttempts.getJSONArray("appAttempt");
assertEquals(MAX_APPS, array.length(), "incorrect number of elements");
}
@MethodSource("rounds")
@ParameterizedTest
void testSingleAttempt(int round) throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
WebResource r = resource();
ClientResponse response =
r.path("ws").path("v1").path("applicationhistory").path("apps")
.path(appId.toString()).path("appattempts")
.path(appAttemptId.toString())
.queryParam("user.name", USERS[round])
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
if (round == 1) {
assertResponseStatusCode(Status.FORBIDDEN, response.getStatusInfo());
return;
}
assertEquals(MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals(1, json.length(), "incorrect number of elements");
JSONObject appAttempt = json.getJSONObject("appAttempt");
assertEquals(appAttemptId.toString(), appAttempt.getString("appAttemptId"));
assertEquals("test host", appAttempt.getString("host"));
assertEquals("test diagnostics info",
appAttempt.getString("diagnosticsInfo"));
assertEquals("test tracking url", appAttempt.getString("trackingUrl"));
assertEquals(YarnApplicationAttemptState.FINISHED.toString(),
appAttempt.get("appAttemptState"));
}
@MethodSource("rounds")
@ParameterizedTest
void testMultipleContainers(int round) throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
WebResource r = resource();
ClientResponse response =
r.path("ws").path("v1").path("applicationhistory").path("apps")
.path(appId.toString()).path("appattempts")
.path(appAttemptId.toString()).path("containers")
.queryParam("user.name", USERS[round])
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
if (round == 1) {
assertResponseStatusCode(Status.FORBIDDEN, response.getStatusInfo());
return;
}
assertEquals(MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals(1, json.length(), "incorrect number of elements");
JSONObject containers = json.getJSONObject("containers");
assertEquals(1, containers.length(), "incorrect number of elements");
JSONArray array = containers.getJSONArray("container");
assertEquals(MAX_APPS, array.length(), "incorrect number of elements");
}
@MethodSource("rounds")
@ParameterizedTest
void testSingleContainer(int round) throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
WebResource r = resource();
ClientResponse response =
r.path("ws").path("v1").path("applicationhistory").path("apps")
.path(appId.toString()).path("appattempts")
.path(appAttemptId.toString()).path("containers")
.path(containerId.toString())
.queryParam("user.name", USERS[round])
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
if (round == 1) {
assertResponseStatusCode(Status.FORBIDDEN, response.getStatusInfo());
return;
}
assertEquals(MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals(1, json.length(), "incorrect number of elements");
JSONObject container = json.getJSONObject("container");
assertEquals(containerId.toString(), container.getString("containerId"));
assertEquals("test diagnostics info", container.getString("diagnosticsInfo"));
assertEquals("-1", container.getString("allocatedMB"));
assertEquals("-1", container.getString("allocatedVCores"));
assertEquals(NodeId.newInstance("test host", 100).toString(),
container.getString("assignedNodeId"));
assertEquals("-1", container.getString("priority"));
Configuration conf = new YarnConfiguration();
assertEquals(WebAppUtils.getHttpSchemePrefix(conf) +
WebAppUtils.getAHSWebAppURLWithoutScheme(conf) +
"/applicationhistory/logs/test host:100/container_0_0001_01_000001/" +
"container_0_0001_01_000001/user1", container.getString("logUrl"));
assertEquals(ContainerState.COMPLETE.toString(),
container.getString("containerState"));
}
@MethodSource("rounds")
@ParameterizedTest
@Timeout(10000)
void testContainerLogsForFinishedApps(int round) throws Exception {
String fileName = "syslog";
String user = "user1";
NodeId nodeId = NodeId.newInstance("test host", 100);
NodeId nodeId2 = NodeId.newInstance("host2", 1234);
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 1);
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
ContainerId containerId100 = ContainerId.newContainerId(appAttemptId, 100);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, appId, Collections.singletonMap(containerId1,
"Hello." + containerId1),
nodeId, fileName, user, true);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, appId, Collections.singletonMap(containerId100,
"Hello." + containerId100),
nodeId2, fileName, user, false);
// test whether we can find container log from remote diretory if
// the containerInfo for this container could be fetched from AHS.
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1.toString()).path(fileName)
.queryParam("user.name", user)
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Hello." + containerId1));
// Do the same test with new API
r = resource();
response = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1.toString()).path("logs").path(fileName)
.queryParam("user.name", user)
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Hello." + containerId1));
// test whether we can find container log from remote diretory if
// the containerInfo for this container could not be fetched from AHS.
r = resource();
response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId100.toString()).path(fileName)
.queryParam("user.name", user)
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Hello." + containerId100));
// Do the same test with new API
r = resource();
response = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId100.toString()).path("logs").path(fileName)
.queryParam("user.name", user)
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Hello." + containerId100));
// create an application which can not be found from AHS
ApplicationId appId100 = ApplicationId.newInstance(0, 100);
ApplicationAttemptId appAttemptId100 = ApplicationAttemptId.newInstance(
appId100, 1);
ContainerId containerId1ForApp100 = ContainerId.newContainerId(
appAttemptId100, 1);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, appId100,
Collections.singletonMap(containerId1ForApp100,
"Hello." + containerId1ForApp100),
nodeId, fileName, user, true);
r = resource();
response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1ForApp100.toString()).path(fileName)
.queryParam("user.name", user)
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Hello." + containerId1ForApp100));
int fullTextSize = responseText.getBytes().length;
String tailEndSeparator = StringUtils.repeat("*",
"End of LogType:syslog".length() + 50) + "\n\n";
int tailTextSize = "\nEnd of LogType:syslog\n".getBytes().length
+ tailEndSeparator.getBytes().length;
String logMessage = "Hello." + containerId1ForApp100;
int fileContentSize = logMessage.getBytes().length;
// specify how many bytes we should get from logs
// if we specify a position number, it would get the first n bytes from
// container log
r = resource();
response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1ForApp100.toString()).path(fileName)
.queryParam("user.name", user)
.queryParam("size", "5")
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertEquals(responseText.getBytes().length,
(fullTextSize - fileContentSize) + 5);
assertTrue(fullTextSize >= responseText.getBytes().length);
assertEquals(new String(responseText.getBytes(),
(fullTextSize - fileContentSize - tailTextSize), 5),
new String(logMessage.getBytes(), 0, 5));
// specify how many bytes we should get from logs
// if we specify a negative number, it would get the last n bytes from
// container log
r = resource();
response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1ForApp100.toString()).path(fileName)
.queryParam("user.name", user)
.queryParam("size", "-5")
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertEquals(responseText.getBytes().length,
(fullTextSize - fileContentSize) + 5);
assertTrue(fullTextSize >= responseText.getBytes().length);
assertEquals(new String(responseText.getBytes(),
(fullTextSize - fileContentSize - tailTextSize), 5),
new String(logMessage.getBytes(), fileContentSize - 5, 5));
// specify the bytes which is larger than the actual file size,
// we would get the full logs
r = resource();
response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1ForApp100.toString()).path(fileName)
.queryParam("user.name", user)
.queryParam("size", "10000")
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertThat(responseText.getBytes()).hasSize(fullTextSize);
r = resource();
response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1ForApp100.toString()).path(fileName)
.queryParam("user.name", user)
.queryParam("size", "-10000")
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertThat(responseText.getBytes()).hasSize(fullTextSize);
}
@MethodSource("rounds")
@ParameterizedTest
@Timeout(10000)
void testContainerLogsForRunningApps(int round) throws Exception {
String fileName = "syslog";
String user = "user1";
ApplicationId appId = ApplicationId.newInstance(
1234, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
WebResource r = resource();
URI requestURI = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1.toString()).path(fileName)
.queryParam("user.name", user).getURI();
String redirectURL = getRedirectURL(requestURI.toString());
assertTrue(redirectURL != null);
assertTrue(redirectURL.contains("test:1234"));
assertTrue(redirectURL.contains("ws/v1/node/containers"));
assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("/logs/" + fileName));
assertTrue(redirectURL.contains("user.name=" + user));
// If we specify NM id, we would re-direct the request
// to this NM's Web Address.
requestURI = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1.toString()).path(fileName)
.queryParam("user.name", user)
.queryParam(YarnWebServiceParams.NM_ID, NM_ID)
.getURI();
redirectURL = getRedirectURL(requestURI.toString());
assertTrue(redirectURL != null);
assertTrue(redirectURL.contains(NM_WEBADDRESS));
assertTrue(redirectURL.contains("ws/v1/node/containers"));
assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("/logs/" + fileName));
assertTrue(redirectURL.contains("user.name=" + user));
// Test with new API
requestURI = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1.toString()).path("logs").path(fileName)
.queryParam("user.name", user).getURI();
redirectURL = getRedirectURL(requestURI.toString());
assertTrue(redirectURL != null);
assertTrue(redirectURL.contains("test:1234"));
assertTrue(redirectURL.contains("ws/v1/node/containers"));
assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("/logs/" + fileName));
assertTrue(redirectURL.contains("user.name=" + user));
requestURI = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1.toString()).path("logs").path(fileName)
.queryParam("user.name", user)
.queryParam(YarnWebServiceParams.NM_ID, NM_ID)
.getURI();
redirectURL = getRedirectURL(requestURI.toString());
assertTrue(redirectURL != null);
assertTrue(redirectURL.contains(NM_WEBADDRESS));
assertTrue(redirectURL.contains("ws/v1/node/containers"));
assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("/logs/" + fileName));
assertTrue(redirectURL.contains("user.name=" + user));
// If we can not container information from ATS, we would try to
// get aggregated log from remote FileSystem.
ContainerId containerId1000 = ContainerId.newContainerId(
appAttemptId, 1000);
String content = "Hello." + containerId1000;
NodeId nodeId = NodeId.newInstance("test host", 100);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, appId, Collections.singletonMap(containerId1000, content),
nodeId, fileName, user, true);
r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1000.toString()).path(fileName)
.queryParam("user.name", user)
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertTrue(responseText.contains(content));
// Also test whether we output the empty local container log, and give
// the warning message.
assertTrue(responseText.contains("LogAggregationType: "
+ ContainerLogAggregationType.LOCAL));
assertTrue(
responseText.contains(LogWebServiceUtils.getNoRedirectWarning()));
// If we can not container information from ATS, and we specify the NM id,
// but we can not get nm web address, we would still try to
// get aggregated log from remote FileSystem.
response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1000.toString()).path(fileName)
.queryParam(YarnWebServiceParams.NM_ID, "invalid-nm:1234")
.queryParam("user.name", user)
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertTrue(responseText.contains(content));
assertTrue(responseText.contains("LogAggregationType: "
+ ContainerLogAggregationType.LOCAL));
assertTrue(
responseText.contains(LogWebServiceUtils.getNoRedirectWarning()));
// If this is the redirect request, we would not re-direct the request
// back and get the aggregated logs.
String content1 = "Hello." + containerId1;
NodeId nodeId1 = NodeId.fromString(NM_ID);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, appId, Collections.singletonMap(containerId1, content1),
nodeId1, fileName, user, true);
response = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1.toString()).path("logs").path(fileName)
.queryParam("user.name", user)
.queryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE, "true")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertTrue(responseText.contains(content1));
assertTrue(responseText.contains("LogAggregationType: "
+ ContainerLogAggregationType.AGGREGATED));
}
@MethodSource("rounds")
@ParameterizedTest
@Timeout(10000)
void testContainerLogsMetaForRunningApps(int round) throws Exception {
String user = "user1";
ApplicationId appId = ApplicationId.newInstance(
1234, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
WebResource r = resource();
// If we specify the NMID, we re-direct the request by using
// the NM's web address
URI requestURI = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1.toString()).path("logs")
.queryParam("user.name", user)
.queryParam(YarnWebServiceParams.NM_ID, NM_ID)
.getURI();
String redirectURL = getRedirectURL(requestURI.toString());
assertTrue(redirectURL != null);
assertTrue(redirectURL.contains(NM_WEBADDRESS));
assertTrue(redirectURL.contains("ws/v1/node/containers"));
assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("/logs"));
// If we do not specify the NodeId but can get Container information
// from ATS, we re-direct the request to the node manager
// who runs the container.
requestURI = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1.toString()).path("logs")
.queryParam("user.name", user).getURI();
redirectURL = getRedirectURL(requestURI.toString());
assertTrue(redirectURL != null);
assertTrue(redirectURL.contains("test:1234"));
assertTrue(redirectURL.contains("ws/v1/node/containers"));
assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("/logs"));
// If we can not container information from ATS,
// and not specify nodeId,
// we would try to get aggregated log meta from remote FileSystem.
ContainerId containerId1000 = ContainerId.newContainerId(
appAttemptId, 1000);
String fileName = "syslog";
String content = "Hello." + containerId1000;
NodeId nodeId = NodeId.newInstance("test host", 100);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, appId, Collections.singletonMap(containerId1000, content),
nodeId, fileName, user, true);
ClientResponse response = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1000.toString()).path("logs")
.queryParam("user.name", user)
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
List<ContainerLogsInfo> responseText = response.getEntity(new GenericType<
List<ContainerLogsInfo>>(){
});
assertTrue(responseText.size() == 2);
for (ContainerLogsInfo logInfo : responseText) {
if (logInfo.getLogType().equals(
ContainerLogAggregationType.AGGREGATED.toString())) {
List<ContainerLogFileInfo> logMeta = logInfo
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
assertThat(logMeta.get(0).getFileName()).isEqualTo(fileName);
assertThat(logMeta.get(0).getFileSize()).isEqualTo(String.valueOf(
content.length()));
} else {
assertEquals(logInfo.getLogType(),
ContainerLogAggregationType.LOCAL.toString());
}
}
// If we can not container information from ATS,
// and we specify NM id, but can not find NM WebAddress for this nodeId,
// we would still try to get aggregated log meta from remote FileSystem.
response = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1000.toString()).path("logs")
.queryParam(YarnWebServiceParams.NM_ID, "invalid-nm:1234")
.queryParam("user.name", user)
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
responseText = response.getEntity(new GenericType<
List<ContainerLogsInfo>>(){
});
assertTrue(responseText.size() == 2);
for (ContainerLogsInfo logInfo : responseText) {
if (logInfo.getLogType().equals(
ContainerLogAggregationType.AGGREGATED.toString())) {
List<ContainerLogFileInfo> logMeta = logInfo
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
assertThat(logMeta.get(0).getFileName()).isEqualTo(fileName);
assertThat(logMeta.get(0).getFileSize()).isEqualTo(String.valueOf(
content.length()));
} else {
assertThat(logInfo.getLogType()).isEqualTo(
ContainerLogAggregationType.LOCAL.toString());
}
}
}
@MethodSource("rounds")
@ParameterizedTest
@Timeout(10000)
void testContainerLogsMetaForFinishedApps(int round) throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
String fileName = "syslog";
String user = "user1";
String content = "Hello." + containerId1;
NodeId nodeId = NodeId.newInstance("test host", 100);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, appId, Collections.singletonMap(containerId1, content),
nodeId, fileName, user, true);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1.toString()).path("logs")
.queryParam("user.name", user)
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
List<ContainerLogsInfo> responseText = response.getEntity(new GenericType<
List<ContainerLogsInfo>>(){
});
assertTrue(responseText.size() == 1);
assertEquals(responseText.get(0).getLogType(),
ContainerLogAggregationType.AGGREGATED.toString());
List<ContainerLogFileInfo> logMeta = responseText.get(0)
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
assertThat(logMeta.get(0).getFileName()).isEqualTo(fileName);
assertThat(logMeta.get(0).getFileSize()).isEqualTo(
String.valueOf(content.length()));
}
private static String getRedirectURL(String url) {
String redirectUrl = null;
try {
HttpURLConnection conn = (HttpURLConnection) new URL(url)
.openConnection();
// do not automatically follow the redirection
// otherwise we get too many redirections exception
conn.setInstanceFollowRedirects(false);
if (conn.getResponseCode() == HttpServletResponse.SC_TEMPORARY_REDIRECT) {
redirectUrl = conn.getHeaderField("Location");
}
} catch (Exception e) {
// throw new RuntimeException(e);
}
return redirectUrl;
}
}