blob: 4ece06668eb53ebd39c788cced83d91d10f7c6fb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.daemon.supervisor;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.BindException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.Constants;
import backtype.storm.daemon.Shutdownable;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.daemon.worker.Worker;
import com.alibaba.jstorm.utils.FileAttribute;
import com.alibaba.jstorm.utils.HttpserverUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.Pair;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.TimeFormat;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
/**
* @author Johnfang (xiaojian.fxj@alibaba-inc.com)
*/
public class Httpserver implements Shutdownable {
private static Logger LOG = LoggerFactory.getLogger(Httpserver.class);
private HttpServer hs;
private int port;
private Map conf;
public Httpserver(int port, Map conf) {
this.port = port;
this.conf = conf;
}
static class LogHandler implements HttpHandler {
private String logDir;
private String stormHome;
private ArrayList<String> accessDirs = new ArrayList<String>();
Map conf;
private final int pageSize;
public LogHandler(Map conf) {
this.pageSize = ConfigExtension.getLogPageSize(conf);
logDir = JStormUtils.getLogDir();
String logDirPath = PathUtils.getCanonicalPath(logDir);
if (logDirPath == null) {
accessDirs.add(logDir);
} else {
accessDirs.add(logDirPath);
}
stormHome = System.getProperty("jstorm.home");
if (stormHome != null) {
String stormHomePath = PathUtils.getCanonicalPath(stormHome);
if (stormHomePath == null) {
accessDirs.add(stormHome);
} else {
accessDirs.add(stormHomePath);
}
}
String confDir = System.getProperty(Constants.JSTORM_CONF_DIR);
if (StringUtils.isBlank(confDir) == false) {
String confDirPath = PathUtils.getCanonicalPath(confDir);
if (confDirPath != null) {
accessDirs.add(confDirPath);
}
}
this.conf = conf;
LOG.info("logview logDir=" + logDir); // +++
}
public void handlFailure(HttpExchange t, String errorMsg) throws IOException {
LOG.error(errorMsg);
byte[] data = errorMsg.getBytes();
t.sendResponseHeaders(HttpURLConnection.HTTP_BAD_REQUEST, data.length);
OutputStream os = t.getResponseBody();
os.write(data);
os.close();
}
public void handle(HttpExchange t) throws IOException {
URI uri = t.getRequestURI();
Map<String, String> paramMap = parseRawQuery(uri.getRawQuery());
LOG.info("Receive command " + paramMap);
String cmd = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD);
if (StringUtils.isBlank(cmd) == true) {
handlFailure(t, "Bad Request, Not set command type");
return;
}
if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW.equals(cmd)) {
handleShowLog(t, paramMap);
return;
} else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_LIST.equals(cmd)) {
handleListDir(t, paramMap);
return;
} else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_JSTACK.equals(cmd)) {
handleJstack(t, paramMap);
return;
} else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW_CONF.equals(cmd)) {
handleShowConf(t, paramMap);
return;
}
handlFailure(t, "Bad Request, Not support command type " + cmd);
return;
}
private void accessCheck(String fileName) throws IOException {
File file = new File(fileName);
String canonicalPath = file.getCanonicalPath();
boolean isChild = false;
for (String dir : accessDirs) {
if (canonicalPath.indexOf(dir) >= 0) {
isChild = true;
break;
}
}
if (isChild == false) {
LOG.error("Access one disallowed path: " + canonicalPath);
throw new IOException("Destination file/path is not accessible.");
}
}
private Map<String, String> parseRawQuery(String uriRawQuery) {
Map<String, String> paramMap = Maps.newHashMap();
for (String param : StringUtils.split(uriRawQuery, "&")) {
String[] pair = StringUtils.split(param, "=");
if (pair.length == 2) {
paramMap.put(pair[0], pair[1]);
}
}
return paramMap;
}
private void handleShowLog(HttpExchange t, Map<String, String> paramMap) throws IOException {
Pair<Long, byte[]> logPair = queryLog(t, paramMap);
if (logPair == null) {
return;
}
String size = String.format(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_SIZE_FORMAT, logPair.getFirst());
byte[] sizeByts = size.getBytes();
byte[] logData = logPair.getSecond();
t.sendResponseHeaders(HttpURLConnection.HTTP_OK, sizeByts.length + logData.length);
OutputStream os = t.getResponseBody();
os.write(sizeByts);
os.write(logData);
os.close();
}
private Pair<Long, byte[]> queryLog(HttpExchange t, Map<String, String> paramMap) throws IOException {
String fileParam = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_LOGFILE);
if (StringUtils.isBlank(fileParam)) {
handlFailure(t, "Bad Request, Params Error, no log file name.");
return null;
}
String logFile = Joiner.on(File.separator).join(logDir, fileParam);
accessCheck(logFile);
FileChannel fc = null;
MappedByteBuffer fout = null;
long fileSize = 0;
byte[] ret = "Failed to get data".getBytes();
try {
fc = new RandomAccessFile(logFile, "r").getChannel();
fileSize = fc.size();
long position = fileSize - pageSize;
try {
String posStr = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_POS);
if (StringUtils.isBlank(posStr) == false) {
long pos = Long.valueOf(posStr);
position = pos;
}
} catch (Exception e) {
LOG.warn("Invalide position " + position);
}
if (position < 0) {
position = 0L;
}
long size = Math.min(fileSize - position, pageSize);
LOG.info("logview " + logFile + ", position=" + position + ", size=" + size);
fout = fc.map(FileChannel.MapMode.READ_ONLY, position, size);
ret = new byte[(int) size];
fout.get(ret);
String str = new String(ret, ConfigExtension.getLogViewEncoding(conf));
return new Pair<Long, byte[]>(fileSize, str.getBytes());
} catch (FileNotFoundException e) {
LOG.warn(e.getMessage(), e);
handlFailure(t, "Bad Request, Failed to find " + fileParam);
return null;
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
handlFailure(t, "Bad Request, Failed to open " + fileParam);
return null;
} finally {
fout = null;
if (fc != null) {
IOUtils.closeQuietly(fc);
}
}
}
byte[] getJSonFiles(String dir) throws Exception {
Map<String, FileAttribute> fileMap = new HashMap<String, FileAttribute>();
String path = logDir;
if (dir != null) {
path = path + File.separator + dir;
}
accessCheck(path);
LOG.info("List dir " + path);
File file = new File(path);
String[] files = file.list();
for (String fileName : files) {
String logFile = Joiner.on(File.separator).join(path, fileName);
FileAttribute fileAttribute = new FileAttribute();
fileAttribute.setFileName(fileName);
File subFile = new File(logFile);
Date modify = new Date(subFile.lastModified());
fileAttribute.setModifyTime(TimeFormat.getSecond(modify));
if (subFile.isFile()) {
fileAttribute.setIsDir(String.valueOf(false));
fileAttribute.setSize(String.valueOf(subFile.length()));
fileMap.put(logFile, fileAttribute);
} else if (subFile.isDirectory()) {
fileAttribute.setIsDir(String.valueOf(true));
fileAttribute.setSize(String.valueOf(4096));
fileMap.put(logFile, fileAttribute);
}
}
String fileJsonStr = JStormUtils.to_json(fileMap);
return fileJsonStr.getBytes();
}
void handleListDir(HttpExchange t, Map<String, String> paramMap) throws IOException {
byte[] filesJson = "Failed to get file list".getBytes();
try {
String dir = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_DIR);
filesJson = getJSonFiles(dir);
} catch (Exception e) {
LOG.error("Failed to list files", e);
handlFailure(t, "Failed to get file list");
return;
}
t.sendResponseHeaders(HttpURLConnection.HTTP_OK, filesJson.length);
OutputStream os = t.getResponseBody();
os.write(filesJson);
os.close();
}
void handleJstack(StringBuffer sb, Integer pid) {
String cmd = "jstack " + pid;
try {
LOG.info("Begin to execute " + cmd);
Process process = JStormUtils.launch_process(cmd, new HashMap<String, String>(), false);
// Process process = Runtime.getRuntime().exec(sb.toString());
InputStream stdin = process.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(stdin));
JStormUtils.sleepMs(1000);
// if (process.exitValue() != 0) {
// LOG.info("Failed to execute " + sb.toString());
// return null;
// }
String str;
while ((str = reader.readLine()) != null) {
if (StringUtils.isBlank(str)) {
// LOG.info(str + " is Blank");
continue;
}
sb.append(str).append("\r\n");
}
LOG.info("Successfully get output of " + cmd);
return;
} catch (IOException e) {
LOG.info("Failed to execute " + cmd, e);
sb.append("Failed to execute " + cmd);
return;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
sb.append("Failed to execute " + cmd + ", " + e.getCause());
return;
}
}
void handleJstack(HttpExchange t, Map<String, String> paramMap) throws IOException {
String workerPort = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_WORKER_PORT);
if (workerPort == null) {
handlFailure(t, "Not set worker's port");
return;
}
LOG.info("Begin to get jstack of " + workerPort);
StringBuffer sb = new StringBuffer();
List<Integer> pids = Worker.getOldPortPids(workerPort);
for (Integer pid : pids) {
sb.append("!!!!!!!!!!!!!!!!!!\r\n");
sb.append("WorkerPort:" + workerPort + ", pid:" + pid);
sb.append("\r\n!!!!!!!!!!!!!!!!!!\r\n");
handleJstack(sb, pid);
}
byte[] data = sb.toString().getBytes();
t.sendResponseHeaders(HttpURLConnection.HTTP_OK, data.length);
OutputStream os = t.getResponseBody();
os.write(data);
os.close();
}
void handleShowConf(HttpExchange t, Map<String, String> paramMap) throws IOException {
byte[] json = "Failed to get configuration".getBytes();
try {
String tmp = Utils.to_json(conf);
json = tmp.getBytes();
} catch (Exception e) {
LOG.error("Failed to get configuration", e);
handlFailure(t, "Failed to get configuration");
return;
}
t.sendResponseHeaders(HttpURLConnection.HTTP_OK, json.length);
OutputStream os = t.getResponseBody();
os.write(json);
os.close();
}
}// LogHandler
public void start() {
int numHandler = 3;
InetSocketAddress socketAddr = new InetSocketAddress(port);
Executor executor = Executors.newFixedThreadPool(numHandler);
try {
hs = HttpServer.create(socketAddr, 0);
hs.createContext(HttpserverUtils.HTTPSERVER_CONTEXT_PATH_LOGVIEW, new LogHandler(conf));
hs.setExecutor(executor);
hs.start();
} catch (BindException e) {
LOG.info("Httpserver Already start!");
hs = null;
return;
} catch (IOException e) {
LOG.error("Httpserver Start Failed", e);
hs = null;
return;
}
LOG.info("Success start httpserver at port:" + port);
}
@Override
public void shutdown() {
if (hs != null) {
hs.stop(0);
LOG.info("Successfully stop http server");
}
}
}