blob: acbc0f1562e658e954098e270f3bd5053cdd8462 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.webserver;
import org.apache.reef.driver.ProgressProvider;
import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
import org.apache.reef.driver.parameters.ClientCloseHandlers;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.tang.InjectionFuture;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.util.logging.LogLevelName;
import org.apache.reef.util.logging.LogParser;
import org.apache.reef.util.logging.LoggingScopeFactory;
import org.apache.reef.util.logging.LoggingScopeImpl;
import org.apache.reef.wake.EventHandler;
import javax.inject.Inject;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Http handler for REEF events.
*/
public final class HttpServerReefEventHandler implements HttpHandler {
private static final Logger LOG = Logger.getLogger(HttpServerReefEventHandler.class.getName());
private static final String VER = "v1";
private final String driverStdoutFile;
private final String driverStderrFile;
private final ReefEventStateManager reefStateManager;
private final Set<EventHandler<Void>> clientCloseHandlers;
private final LoggingScopeFactory loggingScopeFactory;
private final InjectionFuture<ProgressProvider> progressProvider;
/**
* Log level string prefix in the log lines.
*/
private final String logLevelPrefix;
/**
* specification that would match URI request.
*/
private String uriSpecification = "Reef";
@Inject
public HttpServerReefEventHandler(
final ReefEventStateManager reefStateManager,
@Parameter(ClientCloseHandlers.class) final Set<EventHandler<Void>> clientCloseHandlers,
@Parameter(LogLevelName.class) final String logLevel,
final LoggingScopeFactory loggingScopeFactory,
final REEFFileNames reefFileNames,
final InjectionFuture<ProgressProvider> progressProvider) {
this.reefStateManager = reefStateManager;
this.clientCloseHandlers = clientCloseHandlers;
this.loggingScopeFactory = loggingScopeFactory;
this.logLevelPrefix = new StringBuilder().append(logLevel).append(": ").toString();
this.progressProvider = progressProvider;
driverStdoutFile = reefFileNames.getDriverStdoutFileName();
driverStderrFile = reefFileNames.getDriverStderrFileName();
}
/**
* read a file and output it as a String.
*/
private static String readFile(final String fileName) throws IOException {
return new String(Files.readAllBytes(Paths.get(fileName)), StandardCharsets.UTF_8);
}
/**
* @return URI specification for the handler.
*/
@Override
public String getUriSpecification() {
return uriSpecification;
}
/**
* set URI specification.
*/
public void setUriSpecification(final String s) {
uriSpecification = s;
}
/**
* Event handler that is called when receiving a http request.
*/
@Override
public void onHttpRequest(
final ParsedHttpRequest parsedHttpRequest,
final HttpServletResponse response) throws IOException, ServletException {
LOG.log(Level.INFO, "HttpServerReefEventHandler in webserver onHttpRequest is called: {0}",
parsedHttpRequest.getRequestUri());
final String version = parsedHttpRequest.getVersion().toLowerCase();
final String target = parsedHttpRequest.getTargetEntity().toLowerCase();
switch (target) {
case "evaluators": {
final String queryStr = parsedHttpRequest.getQueryString();
if (queryStr == null || queryStr.isEmpty()) {
if (version.equals(VER)) {
writeEvaluatorsJsonOutput(response);
} else {
writeEvaluatorsWebOutput(response);
}
} else {
handleQueries(response, parsedHttpRequest.getQueryMap(), version);
}
break;
}
case "driver":
if (version.equals(VER)) {
writeDriverJsonInformation(response);
} else {
writeDriverWebInformation(response);
}
break;
case "close":
for (final EventHandler<Void> e : clientCloseHandlers) {
e.onNext(null);
}
response.getWriter().println("Enforced closing");
break;
case "kill":
reefStateManager.onClientKill();
response.getWriter().println("Killing");
break;
case "duration":
final ArrayList<String> lines =
LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.DURATION, LoggingScopeImpl.TOKEN, null);
writeLines(response, lines, "Performance...");
break;
case "stages":
final ArrayList<String> starts =
LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.START_PREFIX, logLevelPrefix, null);
final ArrayList<String> exits =
LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.EXIT_PREFIX, logLevelPrefix,
LoggingScopeImpl.DURATION);
final ArrayList<String> startsStages = LogParser.findStages(starts, LogParser.START_INDICATORS);
final ArrayList<String> endStages = LogParser.findStages(exits, LogParser.END_INDICATORS);
final ArrayList<String> result = LogParser.mergeStages(startsStages, endStages);
writeLines(response, result, "Current Stages...");
break;
case "logfile":
final List<String> names = parsedHttpRequest.getQueryMap().get("filename");
final PrintWriter writer = response.getWriter();
if (names == null || names.size() == 0) {
writer.println("File name is not provided");
} else {
final String fileName = names.get(0);
if (!fileName.equals(driverStdoutFile) && !fileName.equals(driverStderrFile)) {
writer.println(String.format("Unsupported file names: [%s] ", fileName));
} else {
try {
final byte[] outputBody = readFile(fileName).getBytes(StandardCharsets.UTF_8);
writer.print(Arrays.toString(outputBody));
} catch (final IOException e) {
writer.println(String.format("Cannot find the log file: [%s].", fileName));
}
}
}
break;
case "progress":
response.getWriter().println(progressProvider.get().getProgress());
break;
default:
response.getWriter().println(String.format("Unsupported query for entity: [%s].", target));
}
}
/**
* handle HTTP queries.
* Example of a query: http://localhost:8080/reef/Evaluators/?id=Node-2-1403225213803&id=Node-1-1403225213712
*/
private void handleQueries(
final HttpServletResponse response,
final Map<String, List<String>> queries,
final String version) throws IOException {
LOG.log(Level.INFO, "HttpServerReefEventHandler handleQueries is called");
for (final Map.Entry<String, List<String>> entry : queries.entrySet()) {
final String queryTarget = entry.getKey().toLowerCase();
switch (queryTarget) {
case "id":
if (version.equals(VER)) {
writeEvaluatorInfoJsonOutput(response, entry.getValue());
} else {
writeEvaluatorInfoWebOutput(response, entry.getValue());
}
break;
default:
response.getWriter().println("Unsupported query : " + queryTarget);
break;
}
}
}
/**
* Write Evaluator info as JSON format to HTTP Response.
*/
private void writeEvaluatorInfoJsonOutput(
final HttpServletResponse response, final List<String> ids) throws IOException {
try {
final EvaluatorInfoSerializer serializer =
Tang.Factory.getTang().newInjector().getInstance(EvaluatorInfoSerializer.class);
final AvroEvaluatorsInfo evaluatorsInfo =
serializer.toAvro(ids, this.reefStateManager.getEvaluators());
writeResponse(response, serializer.toString(evaluatorsInfo));
} catch (final InjectionException e) {
LOG.log(Level.SEVERE, "Error in injecting EvaluatorInfoSerializer.", e);
writeResponse(response, "Error in injecting EvaluatorInfoSerializer: " + e);
}
}
/**
* Write Evaluator info on the Response so that to display on web page directly.
* This is for direct browser queries.
*/
private void writeEvaluatorInfoWebOutput(
final HttpServletResponse response, final List<String> ids) throws IOException {
for (final String id : ids) {
final EvaluatorDescriptor evaluatorDescriptor = this.reefStateManager.getEvaluators().get(id);
final PrintWriter writer = response.getWriter();
if (evaluatorDescriptor != null) {
final String nodeId = evaluatorDescriptor.getNodeDescriptor().getId();
final String nodeName = evaluatorDescriptor.getNodeDescriptor().getName();
final InetSocketAddress address =
evaluatorDescriptor.getNodeDescriptor().getInetSocketAddress();
writer.println("Evaluator Id: " + id);
writer.write("<br/>");
writer.println("Evaluator Node Id: " + nodeId);
writer.write("<br/>");
writer.println("Evaluator Node Name: " + nodeName);
writer.write("<br/>");
writer.println("Evaluator InternetAddress: " + address);
writer.write("<br/>");
writer.println("Evaluator Memory: " + evaluatorDescriptor.getMemory());
writer.write("<br/>");
writer.println("Evaluator Core: " + evaluatorDescriptor.getNumberOfCores());
writer.write("<br/>");
writer.println("Evaluator Type: " + evaluatorDescriptor.getProcess());
writer.write("<br/>");
writer.println("Evaluator Runtime Name: " + evaluatorDescriptor.getRuntimeName());
writer.write("<br/>");
} else {
writer.println("Incorrect Evaluator Id: " + id);
}
}
}
/**
* Get all evaluator ids and send it back to response as JSON.
*/
private void writeEvaluatorsJsonOutput(final HttpServletResponse response) throws IOException {
LOG.log(Level.INFO, "HttpServerReefEventHandler writeEvaluatorsJsonOutput is called");
try {
final EvaluatorListSerializer serializer =
Tang.Factory.getTang().newInjector().getInstance(EvaluatorListSerializer.class);
final AvroEvaluatorList evaluatorList = serializer.toAvro(
this.reefStateManager.getEvaluators(), this.reefStateManager.getEvaluators().size(),
this.reefStateManager.getStartTime());
writeResponse(response, serializer.toString(evaluatorList));
} catch (final InjectionException e) {
LOG.log(Level.SEVERE, "Error in injecting EvaluatorListSerializer.", e);
writeResponse(response, "Error in injecting EvaluatorListSerializer: " + e);
}
}
/**
* Get all evaluator ids and send it back to response so that can be displayed on web.
*
* @param response
* @throws IOException
*/
private void writeEvaluatorsWebOutput(final HttpServletResponse response) throws IOException {
LOG.log(Level.INFO, "HttpServerReefEventHandler writeEvaluatorsWebOutput is called");
final PrintWriter writer = response.getWriter();
writer.println("<h1>Evaluators:</h1>");
for (final Map.Entry<String, EvaluatorDescriptor> entry
: this.reefStateManager.getEvaluators().entrySet()) {
final String key = entry.getKey();
final EvaluatorDescriptor descriptor = entry.getValue();
writer.println("Evaluator Id: " + key);
writer.write("<br/>");
writer.println("Evaluator Name: " + descriptor.getNodeDescriptor().getName());
writer.write("<br/>");
}
writer.write("<br/>");
writer.println("Total number of Evaluators: " + this.reefStateManager.getEvaluators().size());
writer.write("<br/>");
writer.println(String.format("Driver Start Time:[%s]", this.reefStateManager.getStartTime()));
}
/**
* Write Driver Info as JSON string to Response.
*/
private void writeDriverJsonInformation(final HttpServletResponse response) throws IOException {
LOG.log(Level.INFO, "HttpServerReefEventHandler writeDriverJsonInformation invoked.");
try {
final DriverInfoSerializer serializer =
Tang.Factory.getTang().newInjector().getInstance(DriverInfoSerializer.class);
final AvroDriverInfo driverInfo = serializer.toAvro(
this.reefStateManager.getDriverEndpointIdentifier(), this.reefStateManager.getStartTime(),
this.reefStateManager.getServicesInfo());
writeResponse(response, serializer.toString(driverInfo));
} catch (final InjectionException e) {
LOG.log(Level.SEVERE, "Error in injecting DriverInfoSerializer.", e);
writeResponse(response, "Error in injecting DriverInfoSerializer: " + e);
}
}
/**
* Write a String to HTTP Response.
*/
private void writeResponse(final HttpServletResponse response, final String data) throws IOException {
final byte[] outputBody = data.getBytes(StandardCharsets.UTF_8);
response.getOutputStream().write(outputBody);
}
/**
* Get driver information.
*/
private void writeDriverWebInformation(final HttpServletResponse response) throws IOException {
LOG.log(Level.INFO, "HttpServerReefEventHandler writeDriverWebInformation invoked.");
final PrintWriter writer = response.getWriter();
writer.println("<h1>Driver Information:</h1>");
writer.println(String.format("Driver Remote Identifier:[%s]",
this.reefStateManager.getDriverEndpointIdentifier()));
writer.write("<br/><br/>");
writer.println(String.format("Services registered on Driver:"));
writer.write("<br/><br/>");
for (final AvroReefServiceInfo service : this.reefStateManager.getServicesInfo()) {
writer.println(String.format("Service: [%s] , Information: [%s]", service.getServiceName(),
service.getServiceInfo()));
writer.write("<br/><br/>");
}
writer.println(String.format("Driver Start Time:[%s]", this.reefStateManager.getStartTime()));
}
/**
* Write lines in ArrayList to the response writer.
* @param response
* @param lines
* @param header
* @throws IOException
*/
private void writeLines(final HttpServletResponse response, final ArrayList<String> lines, final String header)
throws IOException {
LOG.log(Level.INFO, "HttpServerReefEventHandler writeLines is called");
final PrintWriter writer = response.getWriter();
writer.println("<h1>" + header + "</h1>");
for (final String line : lines) {
writer.println(line);
writer.write("<br/>");
}
writer.write("<br/>");
}
}