blob: dfbe53a9dddf8d962d0066a5174d2fbb4ee6dace [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.datacollection.collector.servlet;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.log4j.Logger;
import java.io.*;
import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import org.apache.hadoop.chukwa.*;
import org.apache.hadoop.chukwa.datacollection.writer.ExtractorWriter;
import org.apache.hadoop.conf.Configuration;
@Deprecated
public class LogDisplayServlet extends HttpServlet {
/*
static class StreamName {
byte[] md5;
public StreamName(Chunk c) {
}
@Override
public int hashCode() {
int x=0;
for(int i=0; i< md5.length; ++i) {
x ^= (md5[i] << 4 * i);
}
return x;
}
public boolean equals(Object x) {
if(x instanceof StreamName)
return Arrays.equals(md5, ((StreamName)x).md5);
else return false;
}
}*/
public static final String DEFAULT_PATH = "logs";
public static final String ENABLED_OPT = "chukwaCollector.showLogs.enabled";
public static final String BUF_SIZE_OPT = "chukwaCollector.showLogs.buffer";
long BUF_SIZE = 1024* 1024;
transient Configuration conf;
transient Map<String, Deque<Chunk>> chunksBySID;
Queue<String> receivedSIDs = new LinkedList<String>();
long totalStoredSize = 0;
private static final long serialVersionUID = -4602082382919009285L;
protected final static Logger log = Logger.getLogger(LogDisplayServlet.class);
public LogDisplayServlet() {
conf = new Configuration();
chunksBySID = new HashMap<String, Deque<Chunk>>();
ExtractorWriter.setRecipient(this);
}
public LogDisplayServlet(Configuration c) {
conf = c;
chunksBySID = new HashMap<String, Deque<Chunk>>();
ExtractorWriter.setRecipient(this);
}
public LogDisplayServlet(Configuration c, Map<String, Deque<Chunk>> chunksBySID) {
conf = c;
this.chunksBySID = chunksBySID;
ExtractorWriter.setRecipient(this);
}
public void init(ServletConfig servletConf) throws ServletException {
BUF_SIZE = conf.getLong(BUF_SIZE_OPT, BUF_SIZE);
}
@Override
protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
}
private String getSID(Chunk c) {
try {
MessageDigest md;
md = MessageDigest.getInstance("MD5");
md.update(c.getSource().getBytes(Charset.forName("UTF-8")));
md.update(c.getStreamName().getBytes(Charset.forName("UTF-8")));
md.update(c.getTags().getBytes(Charset.forName("UTF-8")));
StringBuilder sb = new StringBuilder();
byte[] bytes = md.digest();
for(int i=0; i < bytes.length; ++i) {
if( (bytes[i] & 0xF0) == 0)
sb.append('0');
sb.append( Integer.toHexString(0xFF & bytes[i]) );
}
return sb.toString();
} catch(NoSuchAlgorithmException n) {
log.fatal(n);
return null;
}
}
private void pruneOldEntries() {
while(totalStoredSize > BUF_SIZE) {
String queueToPrune = receivedSIDs.remove();
Deque<Chunk> stream = chunksBySID.get(queueToPrune);
assert !stream.isEmpty() : " expected a chunk in stream with ID " + queueToPrune;
Chunk c = stream.poll();
if(c != null)
totalStoredSize -= c.getData().length;
if(stream.isEmpty()) { //remove empty deques and their names.
chunksBySID.remove(queueToPrune);
}
}
}
public synchronized void add(List<Chunk> chunks) {
for(Chunk c : chunks) {
String sid = getSID(c);
Deque<Chunk> stream = chunksBySID.get(sid);
if(stream == null) {
stream = new LinkedList<Chunk>();
chunksBySID.put(sid, stream);
}
stream.add(c);
receivedSIDs.add(sid);
totalStoredSize += c.getData().length;
}
pruneOldEntries();
}
@Override
protected synchronized void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
PrintStream out = new PrintStream(new BufferedOutputStream(resp.getOutputStream()), true, "UTF-8");
resp.setStatus(200);
String path = req.getServletPath();
String streamID = req.getParameter("sid");
if (streamID != null) {
try {
Deque<Chunk> chunks = chunksBySID.get(streamID);
if(chunks != null) {
String streamName = getFriendlyName(chunks.peek());
out.println("<html><title>Chukwa:Received Data</title><body><h2>Data from "+ streamName + "</h2>");
out.println("<pre>");
for(Chunk c: chunks) {
out.write(c.getData());
}
out.println("</pre><hr><a href=\""+path+"\">Back to list of streams</a>");
} else
out.println("No data");
} catch(Exception e) {
out.println("<html><body>No data</body></html>");
}
out.println("</body></html>");
} else {
out.println("<html><title>Chukwa:Received Data</title><body><h2>Recently-seen streams</h2><ul>");
for(Map.Entry<String, Deque<Chunk>> sid: chunksBySID.entrySet())
out.println("<li> <a href=\"" + path + "?sid="+sid.getKey() + "\">"+ getFriendlyName(sid.getValue().peek()) + "</a></li>");
out.println("</ul></body></html>");
}
out.flush();
}
private String getFriendlyName(Chunk chunk) {
if(chunk != null)
return chunk.getTags() + "/" + chunk.getSource() + "/" + chunk.getStreamName();
else return "null";
}
}