blob: 77020044a179e9fba602484ebdbd2fbbb0743557 [file] [log] [blame]
/** * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.webapp;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMResourceInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.GenericEntity;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMContainerLogsInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@Singleton
@Path("/ws/v1/node")
public class NMWebServices {
private static final Logger LOG =
LoggerFactory.getLogger(NMWebServices.class);
private Context nmContext;
private ResourceView rview;
private WebApp webapp;
private static RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private final String redirectWSUrl;
private final LogAggregationFileControllerFactory factory;
private @javax.ws.rs.core.Context
HttpServletRequest request;
private @javax.ws.rs.core.Context
HttpServletResponse response;
@javax.ws.rs.core.Context
UriInfo uriInfo;
@Inject
public NMWebServices(final Context nm, final ResourceView view,
final WebApp webapp) {
this.nmContext = nm;
this.rview = view;
this.webapp = webapp;
this.redirectWSUrl = this.nmContext.getConf().get(
YarnConfiguration.YARN_LOG_SERVER_WEBSERVICE_URL);
this.factory = new LogAggregationFileControllerFactory(
this.nmContext.getConf());
}
private void init() {
//clear content type
response.setContentType(null);
}
@GET
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public NodeInfo get() {
return getNodeInfo();
}
@GET
@Path("/info")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public NodeInfo getNodeInfo() {
init();
return new NodeInfo(this.nmContext, this.rview);
}
@GET
@Path("/apps")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public AppsInfo getNodeApps(@QueryParam("state") String stateQuery,
@QueryParam("user") String userQuery) {
init();
AppsInfo allApps = new AppsInfo();
for (Entry<ApplicationId, Application> entry : this.nmContext
.getApplications().entrySet()) {
AppInfo appInfo = new AppInfo(entry.getValue());
if (stateQuery != null && !stateQuery.isEmpty()) {
ApplicationState.valueOf(stateQuery);
if (!appInfo.getState().equalsIgnoreCase(stateQuery)) {
continue;
}
}
if (userQuery != null) {
if (userQuery.isEmpty()) {
String msg = "Error: You must specify a non-empty string for the user";
throw new BadRequestException(msg);
}
if (!appInfo.getUser().equals(userQuery)) {
continue;
}
}
allApps.add(appInfo);
}
return allApps;
}
@GET
@Path("/apps/{appid}")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public AppInfo getNodeApp(@PathParam("appid") String appId) {
init();
ApplicationId id = WebAppUtils.parseApplicationId(recordFactory, appId);
Application app = this.nmContext.getApplications().get(id);
if (app == null) {
throw new NotFoundException("app with id " + appId + " not found");
}
return new AppInfo(app);
}
@GET
@Path("/containers")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public ContainersInfo getNodeContainers(@javax.ws.rs.core.Context
HttpServletRequest hsr) {
init();
ContainersInfo allContainers = new ContainersInfo();
for (Entry<ContainerId, Container> entry : this.nmContext.getContainers()
.entrySet()) {
if (entry.getValue() == null) {
// just skip it
continue;
}
ContainerInfo info = new ContainerInfo(this.nmContext, entry.getValue(),
uriInfo.getBaseUri().toString(), webapp.name(), hsr.getRemoteUser());
allContainers.add(info);
}
return allContainers;
}
@GET
@Path("/containers/{containerid}")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public ContainerInfo getNodeContainer(@javax.ws.rs.core.Context
HttpServletRequest hsr, @PathParam("containerid") String id) {
ContainerId containerId = null;
init();
try {
containerId = ContainerId.fromString(id);
} catch (Exception e) {
throw new BadRequestException("invalid container id, " + id);
}
Container container = nmContext.getContainers().get(containerId);
if (container == null) {
throw new NotFoundException("container with id, " + id + ", not found");
}
return new ContainerInfo(this.nmContext, container, uriInfo.getBaseUri()
.toString(), webapp.name(), hsr.getRemoteUser());
}
/**
* Returns log file's name as well as current file size for a container.
*
* @param hsr
* HttpServletRequest
* @param res
* HttpServletResponse
* @param containerIdStr
* The container ID
* @return
* The log file's name and current file size
*/
@GET
@Path("/containers/{containerid}/logs")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public Response getContainerLogsInfo(
@javax.ws.rs.core.Context HttpServletRequest hsr,
@javax.ws.rs.core.Context HttpServletResponse res,
@PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr) {
ContainerId containerId = null;
init();
try {
containerId = ContainerId.fromString(containerIdStr);
} catch (IllegalArgumentException ex) {
throw new BadRequestException("invalid container id, " + containerIdStr);
}
try {
List<ContainerLogsInfo> containersLogsInfo = new ArrayList<>();
containersLogsInfo.add(new NMContainerLogsInfo(
this.nmContext, containerId,
hsr.getRemoteUser(), ContainerLogAggregationType.LOCAL));
// check whether we have aggregated logs in RemoteFS. If exists, show the
// the log meta for the aggregated logs as well.
ApplicationId appId = containerId.getApplicationAttemptId()
.getApplicationId();
Application app = this.nmContext.getApplications().get(appId);
String appOwner = app == null ? null : app.getUser();
try {
ContainerLogsRequest logRequest = new ContainerLogsRequest();
logRequest.setAppId(appId);
logRequest.setAppOwner(appOwner);
logRequest.setContainerId(containerIdStr);
logRequest.setNodeId(this.nmContext.getNodeId().toString());
List<ContainerLogMeta> containerLogMeta = factory
.getFileControllerForRead(appId, appOwner)
.readAggregatedLogsMeta(logRequest);
if (!containerLogMeta.isEmpty()) {
for (ContainerLogMeta logMeta : containerLogMeta) {
containersLogsInfo.add(new ContainerLogsInfo(logMeta,
ContainerLogAggregationType.AGGREGATED));
}
}
} catch (IOException ex) {
// Something wrong with we tries to access the remote fs for the logs.
// Skip it and do nothing
if (LOG.isDebugEnabled()) {
LOG.debug(ex.getMessage());
}
}
GenericEntity<List<ContainerLogsInfo>> meta = new GenericEntity<List<
ContainerLogsInfo>>(containersLogsInfo){};
ResponseBuilder resp = Response.ok(meta);
// Sending the X-Content-Type-Options response header with the value
// nosniff will prevent Internet Explorer from MIME-sniffing a response
// away from the declared content-type.
resp.header("X-Content-Type-Options", "nosniff");
return resp.build();
} catch (Exception ex) {
if (redirectWSUrl == null || redirectWSUrl.isEmpty()) {
throw new WebApplicationException(ex);
}
// redirect the request to the configured log server
String redirectURI = "/containers/" + containerIdStr
+ "/logs";
return createRedirectResponse(hsr, redirectWSUrl, redirectURI);
}
}
/**
* Returns the contents of a container's log file in plain text.
*
* Only works for containers that are still in the NodeManager's memory, so
* logs are no longer available after the corresponding application is no
* longer running.
*
* @param containerIdStr
* The container ID
* @param filename
* The name of the log file
* @param format
* The content type
* @param size
* the size of the log file
* @return
* The contents of the container's log file
*/
@GET
@Path("/containers/{containerid}/logs/{filename}")
@Produces({ MediaType.TEXT_PLAIN })
@Public
@Unstable
public Response getContainerLogFile(
@PathParam(YarnWebServiceParams.CONTAINER_ID)
final String containerIdStr,
@PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME)
String filename,
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_FORMAT)
String format,
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_SIZE)
String size) {
return getLogs(containerIdStr, filename, format, size);
}
/**
* Returns the contents of a container's log file in plain text.
*
* Only works for containers that are still in the NodeManager's memory, so
* logs are no longer available after the corresponding application is no
* longer running.
*
* @param containerIdStr
* The container ID
* @param filename
* The name of the log file
* @param format
* The content type
* @param size
* the size of the log file
* @return
* The contents of the container's log file
*/
@GET
@Path("/containerlogs/{containerid}/{filename}")
@Produces({ MediaType.TEXT_PLAIN })
@Public
@Unstable
public Response getLogs(
@PathParam(YarnWebServiceParams.CONTAINER_ID)
final String containerIdStr,
@PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME)
String filename,
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_FORMAT)
String format,
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_SIZE)
String size) {
ContainerId tempContainerId;
try {
tempContainerId = ContainerId.fromString(containerIdStr);
} catch (IllegalArgumentException ex) {
return Response.status(Status.BAD_REQUEST).entity(ex.getMessage()).build();
}
final ContainerId containerId = tempContainerId;
boolean tempIsRunning = false;
// check what is the status for container
try {
Container container = nmContext.getContainers().get(containerId);
tempIsRunning = (container.getContainerState() == ContainerState.RUNNING);
} catch (Exception ex) {
// This NM does not have this container any more. We
// assume the container has already finished.
if (LOG.isDebugEnabled()) {
LOG.debug("Can not find the container:" + containerId
+ " in this node.");
}
}
final boolean isRunning = tempIsRunning;
File logFile = null;
try {
logFile = ContainerLogsUtils.getContainerLogFile(
containerId, filename, request.getRemoteUser(), nmContext);
} catch (NotFoundException ex) {
if (redirectWSUrl == null || redirectWSUrl.isEmpty()) {
return Response.status(Status.NOT_FOUND).entity(ex.getMessage())
.build();
}
// redirect the request to the configured log server
String redirectURI = "/containers/" + containerIdStr
+ "/logs/" + filename;
return createRedirectResponse(request, redirectWSUrl, redirectURI);
} catch (YarnException ex) {
return Response.serverError().entity(ex.getMessage()).build();
}
final long bytes = parseLongParam(size);
final String lastModifiedTime = Times.format(logFile.lastModified());
final String outputFileName = filename;
String contentType = WebAppUtils.getDefaultLogContentType();
if (format != null && !format.isEmpty()) {
contentType = WebAppUtils.getSupportedLogContentType(format);
if (contentType == null) {
String errorMessage = "The valid values for the parameter : format "
+ "are " + WebAppUtils.listSupportedLogContentType();
return Response.status(Status.BAD_REQUEST).entity(errorMessage)
.build();
}
}
try {
final FileInputStream fis = ContainerLogsUtils.openLogFileForRead(
containerIdStr, logFile, nmContext);
final long fileLength = logFile.length();
StreamingOutput stream = new StreamingOutput() {
@Override
public void write(OutputStream os) throws IOException,
WebApplicationException {
try {
LogToolUtils.outputContainerLogThroughZeroCopy(
containerId.toString(), nmContext.getNodeId().toString(),
outputFileName, fileLength, bytes, lastModifiedTime, fis, os,
ContainerLogAggregationType.LOCAL);
StringBuilder sb = new StringBuilder();
String endOfFile = "End of LogType:" + outputFileName;
sb.append(endOfFile + ".");
if (isRunning) {
sb.append("This log file belongs to a running container ("
+ containerIdStr + ") and so may not be complete." + "\n");
} else {
sb.append("\n");
}
sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ "\n\n");
os.write(sb.toString().getBytes(Charset.forName("UTF-8")));
// If we have aggregated logs for this container,
// output the aggregation logs as well.
ApplicationId appId = containerId.getApplicationAttemptId()
.getApplicationId();
Application app = nmContext.getApplications().get(appId);
String appOwner = app == null ? null : app.getUser();
try {
ContainerLogsRequest logRequest = new ContainerLogsRequest();
logRequest.setAppId(appId);
logRequest.setAppOwner(appOwner);
logRequest.setContainerId(containerId.toString());
logRequest.setNodeId(nmContext.getNodeId().toString());
logRequest.setBytes(bytes);
Set<String> logTypes = new HashSet<>();
logTypes.add(outputFileName);
logRequest.setLogTypes(logTypes);
factory.getFileControllerForRead(appId, appOwner)
.readAggregatedLogs(logRequest, os);
} catch (IOException ex) {
// Something wrong when we try to access the aggregated log.
if (LOG.isDebugEnabled()) {
LOG.debug("Can not access the aggregated log for "
+ "the container:" + containerId);
LOG.debug(ex.getMessage());
}
}
} finally {
IOUtils.closeQuietly(fis);
}
}
};
ResponseBuilder resp = Response.ok(stream);
resp.header("Content-Type", contentType);
// Sending the X-Content-Type-Options response header with the value
// nosniff will prevent Internet Explorer from MIME-sniffing a response
// away from the declared content-type.
resp.header("X-Content-Type-Options", "nosniff");
return resp.build();
} catch (IOException ex) {
return Response.serverError().entity(ex.getMessage()).build();
}
}
@GET
@Path("/resources/{resourcename}")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public Object getNMResourceInfo(
@PathParam("resourcename")
String resourceName) throws YarnException {
init();
ResourcePluginManager rpm = this.nmContext.getResourcePluginManager();
if (rpm != null && rpm.getNameToPlugins() != null) {
ResourcePlugin plugin = rpm.getNameToPlugins().get(resourceName);
if (plugin != null) {
NMResourceInfo nmResourceInfo = plugin.getNMResourceInfo();
if (nmResourceInfo != null) {
return nmResourceInfo;
}
}
}
return new NMResourceInfo();
}
private long parseLongParam(String bytes) {
if (bytes == null || bytes.isEmpty()) {
return Long.MAX_VALUE;
}
return Long.parseLong(bytes);
}
private Response createRedirectResponse(HttpServletRequest httpRequest,
String redirectWSUrlPrefix, String uri) {
// redirect the request to the configured log server
StringBuilder redirectPath = new StringBuilder();
if (redirectWSUrlPrefix.endsWith("/")) {
redirectWSUrlPrefix = redirectWSUrlPrefix.substring(0,
redirectWSUrlPrefix.length() - 1);
}
redirectPath.append(redirectWSUrlPrefix + uri);
// append all the request query parameters except nodeId parameter
String requestParams = WebAppUtils.removeQueryParams(httpRequest,
YarnWebServiceParams.NM_ID);
if (requestParams != null && !requestParams.isEmpty()) {
redirectPath.append("?" + requestParams + "&"
+ YarnWebServiceParams.REDIRECTED_FROM_NODE + "=true");
} else {
redirectPath.append("?" + YarnWebServiceParams.REDIRECTED_FROM_NODE
+ "=true");
}
ResponseBuilder res = Response.status(
HttpServletResponse.SC_TEMPORARY_REDIRECT);
res.header("Location", redirectPath.toString());
return res.build();
}
}